diff --git a/hermes-common/src/main/java/pl/allegro/tech/hermes/common/metric/HermesMetrics.java b/hermes-common/src/main/java/pl/allegro/tech/hermes/common/metric/HermesMetrics.java index a975b3d481..993546c978 100644 --- a/hermes-common/src/main/java/pl/allegro/tech/hermes/common/metric/HermesMetrics.java +++ b/hermes-common/src/main/java/pl/allegro/tech/hermes/common/metric/HermesMetrics.java @@ -164,6 +164,11 @@ public void registerGauge(String name, Gauge gauge) { } } + public void unregisterGauge(String name) { + String path = pathCompiler.compile(name); + metricRegistry.remove(path); + } + private String metricRegistryName(String metricDisplayName, TopicName topicName, String subscription) { PathContext pathContext = PathContext.pathContext() .withGroup(escapeDots(topicName.getGroupName())) diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/config/SupervisorConfiguration.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/config/SupervisorConfiguration.java index 121034dedb..9aee1e717c 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/config/SupervisorConfiguration.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/config/SupervisorConfiguration.java @@ -11,6 +11,7 @@ import pl.allegro.tech.hermes.common.kafka.offset.SubscriptionOffsetChangeIndicator; import pl.allegro.tech.hermes.common.message.wrapper.CompositeMessageContentWrapper; import pl.allegro.tech.hermes.common.metric.HermesMetrics; +import pl.allegro.tech.hermes.consumers.config.WorkloadProperties.TargetWeightCalculationStrategy.UnknownTargetWeightCalculationStrategyException; import pl.allegro.tech.hermes.consumers.config.WorkloadProperties.WeightedWorkBalancingProperties; import pl.allegro.tech.hermes.consumers.config.WorkloadProperties.WorkBalancingStrategy.UnknownWorkBalancingStrategyException; import pl.allegro.tech.hermes.consumers.consumer.ConsumerAuthorizationHandler; @@ -39,16 +40,20 @@ import pl.allegro.tech.hermes.consumers.supervisor.workload.ClusterAssignmentCache; import pl.allegro.tech.hermes.consumers.supervisor.workload.ConsumerAssignmentCache; import pl.allegro.tech.hermes.consumers.supervisor.workload.ConsumerAssignmentRegistry; +import pl.allegro.tech.hermes.consumers.supervisor.workload.NoOpBalancingListener; import pl.allegro.tech.hermes.consumers.supervisor.workload.WorkBalancer; import pl.allegro.tech.hermes.consumers.supervisor.workload.WorkloadSupervisor; import pl.allegro.tech.hermes.consumers.supervisor.workload.selective.SelectiveWorkBalancer; +import pl.allegro.tech.hermes.consumers.supervisor.workload.weighted.AvgTargetWeightCalculator; import pl.allegro.tech.hermes.consumers.supervisor.workload.weighted.ConsumerNodeLoadRegistry; +import pl.allegro.tech.hermes.consumers.supervisor.workload.weighted.CurrentLoadProvider; import pl.allegro.tech.hermes.consumers.supervisor.workload.weighted.NoOpConsumerNodeLoadRegistry; -import pl.allegro.tech.hermes.consumers.supervisor.workload.weighted.NoOpSubscriptionProfileRegistry; -import pl.allegro.tech.hermes.consumers.supervisor.workload.weighted.SubscriptionProfileProvider; +import pl.allegro.tech.hermes.consumers.supervisor.workload.weighted.ScoringTargetWeightCalculator; import pl.allegro.tech.hermes.consumers.supervisor.workload.weighted.SubscriptionProfileRegistry; -import pl.allegro.tech.hermes.consumers.supervisor.workload.weighted.SubscriptionProfilesCalculator; +import pl.allegro.tech.hermes.consumers.supervisor.workload.weighted.TargetWeightCalculator; import pl.allegro.tech.hermes.consumers.supervisor.workload.weighted.WeightedWorkBalancer; +import pl.allegro.tech.hermes.consumers.supervisor.workload.weighted.WeightedWorkBalancingListener; +import pl.allegro.tech.hermes.consumers.supervisor.workload.weighted.WeightedWorkloadMetrics; import pl.allegro.tech.hermes.consumers.supervisor.workload.weighted.ZookeeperConsumerNodeLoadRegistry; import pl.allegro.tech.hermes.consumers.supervisor.workload.weighted.ZookeeperSubscriptionProfileRegistry; import pl.allegro.tech.hermes.domain.notifications.InternalNotificationsBus; @@ -119,7 +124,8 @@ public WorkloadSupervisor workloadSupervisor(InternalNotificationsBus notificati @Bean public WorkBalancer workBalancer(WorkloadProperties workloadProperties, Clock clock, - SubscriptionProfileProvider subscriptionProfileProvider) { + CurrentLoadProvider currentLoadProvider, + TargetWeightCalculator targetWeightCalculator) { switch (workloadProperties.getWorkBalancingStrategy()) { case SELECTIVE: return new SelectiveWorkBalancer(); @@ -129,7 +135,8 @@ public WorkBalancer workBalancer(WorkloadProperties workloadProperties, clock, weightedWorkBalancingProperties.getStabilizationWindowSize(), weightedWorkBalancingProperties.getMinSignificantChangePercent(), - subscriptionProfileProvider + currentLoadProvider, + targetWeightCalculator ); } throw new UnknownWorkBalancingStrategyException(); @@ -168,42 +175,75 @@ public ConsumerNodeLoadRegistry consumerNodeLoadRegistry(CuratorFramework curato } @Bean - public SubscriptionProfilesCalculator subscriptionProfilesCalculator(ConsumerNodeLoadRegistry consumerNodeLoadRegistry, - SubscriptionProfileRegistry subscriptionProfileRegistry, - WorkloadProperties workloadProperties, - Clock clock) { - return new SubscriptionProfilesCalculator( - consumerNodeLoadRegistry, - subscriptionProfileRegistry, - clock, - workloadProperties.getWeightedWorkBalancing().getWeightWindowSize() - ); + public TargetWeightCalculator targetWeightCalculator(WorkloadProperties workloadProperties, + WeightedWorkloadMetrics weightedWorkloadMetrics, + Clock clock) { + WeightedWorkBalancingProperties weightedWorkBalancing = workloadProperties.getWeightedWorkBalancing(); + switch (weightedWorkBalancing.getTargetWeightCalculationStrategy()) { + case AVG: + return new AvgTargetWeightCalculator(weightedWorkloadMetrics); + case SCORING: + return new ScoringTargetWeightCalculator( + weightedWorkloadMetrics, + clock, + weightedWorkBalancing.getWeightWindowSize(), + weightedWorkBalancing.getScoringGain() + ); + } + throw new UnknownTargetWeightCalculationStrategyException(); } @Bean - public SubscriptionProfileRegistry subscriptionProfileRegistry(CuratorFramework curator, - SubscriptionIds subscriptionIds, - ZookeeperPaths zookeeperPaths, - WorkloadProperties workloadProperties, - KafkaClustersProperties kafkaClustersProperties, - DatacenterNameProvider datacenterNameProvider) { + public BalancingListener balancingListener(ConsumerNodeLoadRegistry consumerNodeLoadRegistry, + SubscriptionProfileRegistry subscriptionProfileRegistry, + WorkloadProperties workloadProperties, + CurrentLoadProvider currentLoadProvider, + WeightedWorkloadMetrics weightedWorkloadMetrics, + Clock clock) { switch (workloadProperties.getWorkBalancingStrategy()) { case SELECTIVE: - return new NoOpSubscriptionProfileRegistry(); + return new NoOpBalancingListener(); case WEIGHTED: - KafkaProperties kafkaProperties = kafkaClustersProperties.toKafkaProperties(datacenterNameProvider); - WeightedWorkBalancingProperties weightedWorkBalancing = workloadProperties.getWeightedWorkBalancing(); - return new ZookeeperSubscriptionProfileRegistry( - curator, - subscriptionIds, - zookeeperPaths, - kafkaProperties.getClusterName(), - weightedWorkBalancing.getSubscriptionProfilesEncoderBufferSizeBytes() + return new WeightedWorkBalancingListener( + consumerNodeLoadRegistry, + subscriptionProfileRegistry, + currentLoadProvider, + weightedWorkloadMetrics, + clock, + workloadProperties.getWeightedWorkBalancing().getWeightWindowSize() ); } throw new UnknownWorkBalancingStrategyException(); } + @Bean + public CurrentLoadProvider currentLoadProvider() { + return new CurrentLoadProvider(); + } + + @Bean + public WeightedWorkloadMetrics weightedWorkloadMetrics(HermesMetrics hermesMetrics) { + return new WeightedWorkloadMetrics(hermesMetrics); + } + + @Bean + public SubscriptionProfileRegistry subscriptionProfileRegistry(CuratorFramework curator, + SubscriptionIds subscriptionIds, + ZookeeperPaths zookeeperPaths, + WorkloadProperties workloadProperties, + KafkaClustersProperties kafkaClustersProperties, + DatacenterNameProvider datacenterNameProvider) { + KafkaProperties kafkaProperties = kafkaClustersProperties.toKafkaProperties(datacenterNameProvider); + WeightedWorkBalancingProperties weightedWorkBalancing = workloadProperties.getWeightedWorkBalancing(); + return new ZookeeperSubscriptionProfileRegistry( + curator, + subscriptionIds, + zookeeperPaths, + kafkaProperties.getClusterName(), + weightedWorkBalancing.getSubscriptionProfilesEncoderBufferSizeBytes() + ); + } + @Bean public Retransmitter retransmitter(SubscriptionOffsetChangeIndicator subscriptionOffsetChangeIndicator, KafkaClustersProperties kafkaClustersProperties, diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/config/WorkloadProperties.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/config/WorkloadProperties.java index 9d84510915..358636b4bb 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/config/WorkloadProperties.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/config/WorkloadProperties.java @@ -144,6 +144,10 @@ public static class WeightedWorkBalancingProperties { private Duration weightWindowSize = Duration.ofMinutes(15); + private TargetWeightCalculationStrategy targetWeightCalculationStrategy = TargetWeightCalculationStrategy.AVG; + + private double scoringGain = 1.0d; + public int getConsumerLoadEncoderBufferSizeBytes() { return consumerLoadEncoderBufferSizeBytes; } @@ -191,6 +195,22 @@ public Duration getWeightWindowSize() { public void setWeightWindowSize(Duration weightWindowSize) { this.weightWindowSize = weightWindowSize; } + + public TargetWeightCalculationStrategy getTargetWeightCalculationStrategy() { + return targetWeightCalculationStrategy; + } + + public void setTargetWeightCalculationStrategy(TargetWeightCalculationStrategy targetWeightCalculationStrategy) { + this.targetWeightCalculationStrategy = targetWeightCalculationStrategy; + } + + public double getScoringGain() { + return scoringGain; + } + + public void setScoringGain(double scoringGain) { + this.scoringGain = scoringGain; + } } public enum WorkBalancingStrategy { @@ -204,4 +224,16 @@ public UnknownWorkBalancingStrategyException() { } } } + + public enum TargetWeightCalculationStrategy { + AVG, + SCORING; + + public static class UnknownTargetWeightCalculationStrategyException extends InternalProcessingException { + + public UnknownTargetWeightCalculationStrategyException() { + super("Unknown target weight calculation strategy. Use one of: " + Arrays.toString(values())); + } + } + } } diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/workload/BalancingJob.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/workload/BalancingJob.java index 7c39ff7b29..1a3da3c7dd 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/workload/BalancingJob.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/workload/BalancingJob.java @@ -109,6 +109,7 @@ public void run() { } } else { balancingMetrics.reset(); + balancingListener.onBalancingSkipped(); } } catch (Exception e) { logger.error("Caught exception when running balancing job", e); diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/workload/BalancingListener.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/workload/BalancingListener.java index 2a57bfcb03..5d8aa9a3f7 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/workload/BalancingListener.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/workload/BalancingListener.java @@ -7,4 +7,6 @@ public interface BalancingListener { void onBeforeBalancing(List activeConsumers); void onAfterBalancing(WorkDistributionChanges changes); + + void onBalancingSkipped(); } diff --git a/hermes-consumers/src/test/java/pl/allegro/tech/hermes/consumers/supervisor/workload/NoOpBalancingListener.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/workload/NoOpBalancingListener.java similarity index 84% rename from hermes-consumers/src/test/java/pl/allegro/tech/hermes/consumers/supervisor/workload/NoOpBalancingListener.java rename to hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/workload/NoOpBalancingListener.java index 37d51350ca..6c4b08f0fc 100644 --- a/hermes-consumers/src/test/java/pl/allegro/tech/hermes/consumers/supervisor/workload/NoOpBalancingListener.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/workload/NoOpBalancingListener.java @@ -13,4 +13,9 @@ public void onBeforeBalancing(List activeConsumers) { public void onAfterBalancing(WorkDistributionChanges changes) { } + + @Override + public void onBalancingSkipped() { + + } } diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/workload/weighted/AvgTargetWeightCalculator.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/workload/weighted/AvgTargetWeightCalculator.java new file mode 100644 index 0000000000..49c984a2e1 --- /dev/null +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/workload/weighted/AvgTargetWeightCalculator.java @@ -0,0 +1,33 @@ +package pl.allegro.tech.hermes.consumers.supervisor.workload.weighted; + +import java.util.Collection; +import java.util.Map; + +import static java.util.stream.Collectors.toMap; + +public class AvgTargetWeightCalculator implements TargetWeightCalculator { + + private final WeightedWorkloadMetrics metrics; + + public AvgTargetWeightCalculator(WeightedWorkloadMetrics metrics) { + this.metrics = metrics; + } + + @Override + public Map calculate(Collection consumers) { + if (consumers.isEmpty()) { + return Map.of(); + } + + metrics.reportCurrentWeights(consumers); + Weight sum = consumers.stream() + .map(ConsumerNode::getWeight) + .reduce(Weight.ZERO, Weight::add); + Weight average = sum.divide(consumers.size()); + + Map newWeights = consumers.stream() + .collect(toMap(ConsumerNode::getConsumerId, ignore -> average)); + metrics.reportProposedWeights(newWeights); + return newWeights; + } +} diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/workload/weighted/ConsumerNode.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/workload/weighted/ConsumerNode.java index 742224f417..875a63188b 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/workload/weighted/ConsumerNode.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/workload/weighted/ConsumerNode.java @@ -12,12 +12,14 @@ class ConsumerNode { static Comparator LIGHTEST_CONSUMER_FIRST = comparing(ConsumerNode::getWeight); private final String consumerId; + private final ConsumerNodeLoad initialLoad; private final int maxSubscriptionsPerConsumer; private final Set tasks = new HashSet<>(); private Weight weight = Weight.ZERO; - ConsumerNode(String consumerId, int maxSubscriptionsPerConsumer) { + ConsumerNode(String consumerId, ConsumerNodeLoad initialLoad, int maxSubscriptionsPerConsumer) { this.consumerId = consumerId; + this.initialLoad = initialLoad; this.maxSubscriptionsPerConsumer = maxSubscriptionsPerConsumer; } @@ -60,6 +62,10 @@ Weight getWeight() { return weight; } + ConsumerNodeLoad getInitialLoad() { + return initialLoad; + } + @Override public String toString() { return consumerId; diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/workload/weighted/ConsumerNodeLoad.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/workload/weighted/ConsumerNodeLoad.java index 272c03c281..4c6000b515 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/workload/weighted/ConsumerNodeLoad.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/workload/weighted/ConsumerNodeLoad.java @@ -3,15 +3,16 @@ import pl.allegro.tech.hermes.api.SubscriptionName; import java.util.Map; -import java.util.Objects; class ConsumerNodeLoad { - static final ConsumerNodeLoad UNDEFINED = new ConsumerNodeLoad(Map.of()); + static final ConsumerNodeLoad UNDEFINED = new ConsumerNodeLoad(-1d, Map.of()); + private final double cpuUtilization; private final Map loadPerSubscription; - ConsumerNodeLoad(Map loadPerSubscription) { + ConsumerNodeLoad(double cpuUtilization, Map loadPerSubscription) { + this.cpuUtilization = cpuUtilization; this.loadPerSubscription = loadPerSubscription; } @@ -19,20 +20,17 @@ Map getLoadPerSubscription() { return loadPerSubscription; } - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - ConsumerNodeLoad that = (ConsumerNodeLoad) o; - return Objects.equals(loadPerSubscription, that.loadPerSubscription); + double getCpuUtilization() { + return cpuUtilization; } - @Override - public int hashCode() { - return Objects.hash(loadPerSubscription); + double sumOperationsPerSecond() { + return loadPerSubscription.values().stream() + .mapToDouble(SubscriptionLoad::getOperationsPerSecond) + .sum(); + } + + boolean isDefined() { + return cpuUtilization != UNDEFINED.getCpuUtilization(); } } diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/workload/weighted/ConsumerNodeLoadDecoder.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/workload/weighted/ConsumerNodeLoadDecoder.java index c2edb0a8a7..fc7332e42f 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/workload/weighted/ConsumerNodeLoadDecoder.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/workload/weighted/ConsumerNodeLoadDecoder.java @@ -43,7 +43,7 @@ ConsumerNodeLoad decode(byte[] bytes) { body.wrap(buffer, header.encodedLength(), header.blockLength(), header.version()); - return new ConsumerNodeLoad(decodeSubscriptionLoads(body)); + return new ConsumerNodeLoad(body.cpuUtilization(), decodeSubscriptionLoads(body)); } private Map decodeSubscriptionLoads(ConsumerLoadDecoder body) { diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/workload/weighted/ConsumerNodeLoadEncoder.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/workload/weighted/ConsumerNodeLoadEncoder.java index 3ed4f3d38c..83c20cc474 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/workload/weighted/ConsumerNodeLoadEncoder.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/workload/weighted/ConsumerNodeLoadEncoder.java @@ -23,14 +23,15 @@ class ConsumerNodeLoadEncoder { this.buffer = new ExpandableDirectByteBuffer(bufferSize); } - byte[] encode(ConsumerNodeLoad metrics) { - Map subscriptionLoads = mapToSubscriptionIds(metrics); + byte[] encode(ConsumerNodeLoad consumerNodeLoad) { + Map subscriptionLoads = mapToSubscriptionIds(consumerNodeLoad); MessageHeaderEncoder headerEncoder = new MessageHeaderEncoder(); ConsumerLoadEncoder body = new ConsumerLoadEncoder() .wrapAndApplyHeader(buffer, 0, headerEncoder); SubscriptionsEncoder loadPerSubscriptionEncoder = body + .cpuUtilization(consumerNodeLoad.getCpuUtilization()) .subscriptionsCount(subscriptionLoads.size()); for (Map.Entry entry : subscriptionLoads.entrySet()) { diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/workload/weighted/CurrentLoadProvider.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/workload/weighted/CurrentLoadProvider.java new file mode 100644 index 0000000000..e23674e2b9 --- /dev/null +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/workload/weighted/CurrentLoadProvider.java @@ -0,0 +1,32 @@ +package pl.allegro.tech.hermes.consumers.supervisor.workload.weighted; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public class CurrentLoadProvider { + + private final Map consumerLoads = new ConcurrentHashMap<>(); + private volatile SubscriptionProfiles profiles = SubscriptionProfiles.EMPTY; + + SubscriptionProfiles getProfiles() { + return profiles; + } + + ConsumerNodeLoad getConsumerNodeLoad(String consumerId) { + return consumerLoads.getOrDefault(consumerId, ConsumerNodeLoad.UNDEFINED); + } + + void updateConsumerNodeLoads(Map newConsumerLoads) { + consumerLoads.clear(); + consumerLoads.putAll(newConsumerLoads); + } + + void updateProfiles(SubscriptionProfiles newProfiles) { + profiles = newProfiles; + } + + void clear() { + consumerLoads.clear(); + profiles = SubscriptionProfiles.EMPTY; + } +} diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/workload/weighted/ExponentiallyWeightedMovingAverage.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/workload/weighted/ExponentiallyWeightedMovingAverage.java new file mode 100644 index 0000000000..4c2782059c --- /dev/null +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/workload/weighted/ExponentiallyWeightedMovingAverage.java @@ -0,0 +1,32 @@ +package pl.allegro.tech.hermes.consumers.supervisor.workload.weighted; + +import java.time.Duration; +import java.time.Instant; +import java.util.concurrent.TimeUnit; + +class ExponentiallyWeightedMovingAverage { + + private final Duration windowSize; + + private Instant previousUpdateTimestamp; + private double currentAverage = 0d; + + ExponentiallyWeightedMovingAverage(Duration windowSize) { + this.windowSize = windowSize; + } + + double update(double sample, Instant now) { + if (previousUpdateTimestamp == null) { + currentAverage = sample; + } else { + // This calculation is done in the same way as the Linux load average is calculated. + // See: https://www.helpsystems.com/resources/guides/unix-load-average-part-1-how-it-works + Duration elapsed = Duration.between(previousUpdateTimestamp, now); + long elapsedMillis = Math.max(TimeUnit.MILLISECONDS.convert(elapsed), 0); + double alpha = 1.0 - Math.exp(-1.0 * ((double) elapsedMillis / windowSize.toMillis())); + currentAverage = (sample * alpha) + (currentAverage * (1.0 - alpha)); + } + previousUpdateTimestamp = now; + return currentAverage; + } +} diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/workload/weighted/NoOpSubscriptionProfileRegistry.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/workload/weighted/NoOpSubscriptionProfileRegistry.java deleted file mode 100644 index eb8a281466..0000000000 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/workload/weighted/NoOpSubscriptionProfileRegistry.java +++ /dev/null @@ -1,16 +0,0 @@ -package pl.allegro.tech.hermes.consumers.supervisor.workload.weighted; - -import static pl.allegro.tech.hermes.consumers.supervisor.workload.weighted.SubscriptionProfiles.EMPTY; - -public class NoOpSubscriptionProfileRegistry implements SubscriptionProfileRegistry { - - @Override - public SubscriptionProfiles fetch() { - return EMPTY; - } - - @Override - public void persist(SubscriptionProfiles profiles) { - - } -} diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/workload/weighted/ScoringTargetWeightCalculator.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/workload/weighted/ScoringTargetWeightCalculator.java new file mode 100644 index 0000000000..f86fde3597 --- /dev/null +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/workload/weighted/ScoringTargetWeightCalculator.java @@ -0,0 +1,139 @@ +package pl.allegro.tech.hermes.consumers.supervisor.workload.weighted; + +import java.time.Clock; +import java.time.Duration; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static java.util.stream.Collectors.toList; +import static java.util.stream.Collectors.toMap; +import static java.util.stream.Collectors.toSet; + +public class ScoringTargetWeightCalculator implements TargetWeightCalculator { + + private static final double MIN_SCORE = 0.01d; + private static final double MAX_SCORE = 1.0d; + + private final WeightedWorkloadMetrics metrics; + private final Clock clock; + private final Duration scoringWindowSize; + private final double scoringGain; + private final Map scores = new HashMap<>(); + + public ScoringTargetWeightCalculator(WeightedWorkloadMetrics metrics, Clock clock, Duration scoringWindowSize, double scoringGain) { + this.metrics = metrics; + this.clock = clock; + this.scoringWindowSize = scoringWindowSize; + this.scoringGain = scoringGain; + } + + @Override + public Map calculate(Collection consumers) { + removeScoresForInactiveConsumers(consumers); + + metrics.reportCurrentWeights(consumers); + Map loadPerConsumer = mapConsumerIdToLoad(consumers); + double targetCpuUtilization = calculateTargetCpuUtilization(loadPerConsumer); + Map currentScores = calculateCurrentScores(loadPerConsumer); + + Map newScores = new HashMap<>(); + for (Map.Entry entry : currentScores.entrySet()) { + String consumerId = entry.getKey(); + double cpuUtilization = loadPerConsumer.get(consumerId).getCpuUtilization(); + double error = targetCpuUtilization - cpuUtilization; + double currentScore = entry.getValue(); + double newScore = calculateNewScore(consumerId, currentScore, error); + newScores.put(consumerId, newScore); + metrics.reportCurrentScore(consumerId, currentScore); + metrics.reportProposedScore(consumerId, newScore); + metrics.reportScoringError(consumerId, error); + } + + Map newWeights = calculateWeights(consumers, newScores); + metrics.reportProposedWeights(newWeights); + return newWeights; + } + + private void removeScoresForInactiveConsumers(Collection consumers) { + Set consumerIds = consumers.stream().map(ConsumerNode::getConsumerId).collect(toSet()); + scores.entrySet().removeIf(e -> !consumerIds.contains(e.getKey())); + } + + private Map mapConsumerIdToLoad(Collection consumers) { + return consumers.stream() + .collect(toMap(ConsumerNode::getConsumerId, ConsumerNode::getInitialLoad)); + } + + private double calculateTargetCpuUtilization(Map loadPerConsumer) { + return loadPerConsumer.values().stream() + .filter(ConsumerNodeLoad::isDefined) + .mapToDouble(ConsumerNodeLoad::getCpuUtilization) + .average() + .orElse(0d); + } + + private Map calculateCurrentScores(Map loadPerConsumer) { + Map opsPerConsumer = loadPerConsumer.entrySet().stream() + .filter(e -> e.getValue().isDefined()) + .collect(toMap(Map.Entry::getKey, e -> e.getValue().sumOperationsPerSecond())); + double opsSum = opsPerConsumer.values().stream() + .mapToDouble(ops -> ops) + .sum(); + return opsPerConsumer.entrySet().stream() + .collect(toMap(Map.Entry::getKey, e -> calculateCurrentScore(e.getValue(), opsSum))); + } + + private double calculateCurrentScore(double ops, double opsSum) { + if (opsSum > 0) { + return ops / opsSum; + } + return 0; + } + + private double calculateNewScore(String consumerId, double currentScore, double error) { + double rawScore = currentScore + scoringGain * error; + ExponentiallyWeightedMovingAverage average = scores.computeIfAbsent( + consumerId, + ignore -> new ExponentiallyWeightedMovingAverage(scoringWindowSize) + ); + double avg = average.update(rawScore, clock.instant()); + return ensureScoreRanges(avg); + } + + private double ensureScoreRanges(double score) { + return Math.max(Math.min(score, MAX_SCORE), MIN_SCORE); + } + + private Map calculateWeights(Collection consumers, Map newScores) { + Weight sum = consumers.stream() + .map(ConsumerNode::getWeight) + .reduce(Weight.ZERO, Weight::add); + Weight avgWeight = calculateAvgWeight(sum, consumers.size()); + List consumersWithoutScore = consumers.stream() + .filter(consumerNode -> !newScores.containsKey(consumerNode.getConsumerId())) + .collect(toList()); + Map newWeights = new HashMap<>(); + for (ConsumerNode consumerNode : consumersWithoutScore) { + newWeights.put(consumerNode.getConsumerId(), avgWeight); + sum = sum.subtract(avgWeight); + } + double newScoresSum = newScores.values().stream() + .mapToDouble(score -> score) + .sum(); + for (Map.Entry entry : newScores.entrySet()) { + Weight weight = sum.multiply(entry.getValue() / newScoresSum); + newWeights.put(entry.getKey(), weight); + } + return newWeights; + } + + private Weight calculateAvgWeight(Weight sum, int consumerCount) { + if (consumerCount == 0) { + return Weight.ZERO; + } + return sum.divide(consumerCount); + } +} diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/workload/weighted/SubscriptionProfile.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/workload/weighted/SubscriptionProfile.java index 96ef3c7f90..ddcb1ae53a 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/workload/weighted/SubscriptionProfile.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/workload/weighted/SubscriptionProfile.java @@ -1,11 +1,10 @@ package pl.allegro.tech.hermes.consumers.supervisor.workload.weighted; import java.time.Instant; -import java.util.Objects; class SubscriptionProfile { - static SubscriptionProfile UNDEFINED = new SubscriptionProfile(null, Weight.ZERO); + static SubscriptionProfile UNDEFINED = new SubscriptionProfile(Instant.MIN, Weight.ZERO); private final Instant lastRebalanceTimestamp; private final Weight weight; @@ -22,29 +21,4 @@ Weight getWeight() { Instant getLastRebalanceTimestamp() { return lastRebalanceTimestamp; } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - SubscriptionProfile profile = (SubscriptionProfile) o; - return Objects.equals(weight, profile.weight) - && Objects.equals(toMillis(lastRebalanceTimestamp), toMillis(profile.lastRebalanceTimestamp)); - } - - @Override - public int hashCode() { - return Objects.hash(toMillis(lastRebalanceTimestamp), weight); - } - - private Long toMillis(Instant timestamp) { - if (timestamp == null) { - return null; - } - return timestamp.toEpochMilli(); - } } diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/workload/weighted/SubscriptionProfileProvider.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/workload/weighted/SubscriptionProfileProvider.java deleted file mode 100644 index 71f2322e6f..0000000000 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/workload/weighted/SubscriptionProfileProvider.java +++ /dev/null @@ -1,8 +0,0 @@ -package pl.allegro.tech.hermes.consumers.supervisor.workload.weighted; - -import pl.allegro.tech.hermes.api.SubscriptionName; - -public interface SubscriptionProfileProvider { - - SubscriptionProfile get(SubscriptionName subscriptionName); -} diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/workload/weighted/SubscriptionProfiles.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/workload/weighted/SubscriptionProfiles.java index 3b267f4bec..32f205485b 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/workload/weighted/SubscriptionProfiles.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/workload/weighted/SubscriptionProfiles.java @@ -3,23 +3,15 @@ import pl.allegro.tech.hermes.api.SubscriptionName; import java.time.Instant; -import java.util.HashMap; import java.util.Map; -import java.util.Objects; import java.util.Set; -import static pl.allegro.tech.hermes.consumers.supervisor.workload.weighted.SubscriptionProfile.UNDEFINED; - class SubscriptionProfiles { - static final SubscriptionProfiles EMPTY = new SubscriptionProfiles(Map.of(), null); + static final SubscriptionProfiles EMPTY = new SubscriptionProfiles(Map.of(), Instant.MIN); private final Map profiles; - private Instant updateTimestamp; - - SubscriptionProfiles() { - this(new HashMap<>(), null); - } + private final Instant updateTimestamp; SubscriptionProfiles(Map profiles, Instant updateTimestamp) { this.profiles = profiles; @@ -35,44 +27,10 @@ Set getSubscriptions() { } SubscriptionProfile getProfile(SubscriptionName subscriptionName) { - return profiles.get(subscriptionName); - } - - SubscriptionProfile getProfileOrUndefined(SubscriptionName subscriptionName) { - return profiles.getOrDefault(subscriptionName, UNDEFINED); - } - - void updateProfile(SubscriptionName subscriptionName, SubscriptionProfile newProfile) { - profiles.put(subscriptionName, newProfile); - } - - void reset(Instant newUpdateTimestamp) { - profiles.clear(); - updateTimestamp = newUpdateTimestamp; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - SubscriptionProfiles that = (SubscriptionProfiles) o; - return Objects.equals(profiles, that.profiles) - && Objects.equals(toMillis(updateTimestamp), toMillis(that.updateTimestamp)); - } - - @Override - public int hashCode() { - return Objects.hash(profiles, toMillis(updateTimestamp)); + return profiles.getOrDefault(subscriptionName, SubscriptionProfile.UNDEFINED); } - private Long toMillis(Instant timestamp) { - if (timestamp == null) { - return null; - } - return timestamp.toEpochMilli(); + Map getProfiles() { + return profiles; } } diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/workload/weighted/SubscriptionProfilesCalculator.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/workload/weighted/SubscriptionProfilesCalculator.java index 8ea1dc4f1f..2e8c12bacf 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/workload/weighted/SubscriptionProfilesCalculator.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/workload/weighted/SubscriptionProfilesCalculator.java @@ -1,125 +1,60 @@ package pl.allegro.tech.hermes.consumers.supervisor.workload.weighted; import pl.allegro.tech.hermes.api.SubscriptionName; -import pl.allegro.tech.hermes.consumers.supervisor.workload.BalancingListener; -import pl.allegro.tech.hermes.consumers.supervisor.workload.WorkDistributionChanges; -import javax.annotation.concurrent.NotThreadSafe; import java.time.Clock; import java.time.Duration; import java.time.Instant; +import java.util.Collection; import java.util.HashMap; -import java.util.List; import java.util.Map; -@NotThreadSafe -public class SubscriptionProfilesCalculator implements SubscriptionProfileProvider, BalancingListener { +class SubscriptionProfilesCalculator { - private final ConsumerNodeLoadRegistry consumerNodeLoadRegistry; - private final SubscriptionProfileRegistry subscriptionProfileRegistry; private final Clock clock; private final Duration weightWindowSize; - private final SubscriptionProfiles profiles = new SubscriptionProfiles(); - - public SubscriptionProfilesCalculator(ConsumerNodeLoadRegistry consumerNodeLoadRegistry, - SubscriptionProfileRegistry subscriptionProfileRegistry, - Clock clock, - Duration weightWindowSize) { - this.consumerNodeLoadRegistry = consumerNodeLoadRegistry; - this.subscriptionProfileRegistry = subscriptionProfileRegistry; + SubscriptionProfilesCalculator(Clock clock, Duration weightWindowSize) { this.clock = clock; this.weightWindowSize = weightWindowSize; } - @Override - public void onBeforeBalancing(List activeConsumers) { - SubscriptionProfiles subscriptionProfiles = subscriptionProfileRegistry.fetch(); - Map currentWeights = createWeightCalculators(activeConsumers, subscriptionProfiles); - profiles.reset(clock.instant()); - for (Map.Entry entry : currentWeights.entrySet()) { + SubscriptionProfiles calculate(Collection consumerLoads, SubscriptionProfiles previousProfiles) { + Map currentWeights = calculateCurrentWeights(consumerLoads); + Map newProfiles = new HashMap<>(); + Instant now = clock.instant(); + for (Map.Entry entry : currentWeights.entrySet()) { SubscriptionName subscriptionName = entry.getKey(); - WeightCalculator weightCalculator = entry.getValue(); - SubscriptionProfile previousProfile = subscriptionProfiles.getProfile(subscriptionName); - SubscriptionProfile newProfile = new SubscriptionProfile( - previousProfile != null ? previousProfile.getLastRebalanceTimestamp() : null, - weightCalculator.calculateExponentiallyWeightedMovingAverage(profiles.getUpdateTimestamp()) - ); - profiles.updateProfile(subscriptionName, newProfile); + Weight currentWeight = entry.getValue(); + SubscriptionProfile newProfile = applyCurrentWeight(previousProfiles, subscriptionName, currentWeight, now); + newProfiles.put(subscriptionName, newProfile); } + return new SubscriptionProfiles(newProfiles, now); } - private Map createWeightCalculators(List activeConsumers, - SubscriptionProfiles subscriptionProfiles) { - Map weightCalculators = new HashMap<>(); - for (String consumerId : activeConsumers) { - ConsumerNodeLoad consumerNodeLoad = consumerNodeLoadRegistry.get(consumerId); - for (Map.Entry entry : consumerNodeLoad.getLoadPerSubscription().entrySet()) { + private Map calculateCurrentWeights(Collection consumerLoads) { + Map currentWeights = new HashMap<>(); + for (ConsumerNodeLoad consumerLoad : consumerLoads) { + for (Map.Entry entry : consumerLoad.getLoadPerSubscription().entrySet()) { SubscriptionName subscriptionName = entry.getKey(); - Weight currentWeight = new Weight(entry.getValue().getOperationsPerSecond()); - WeightCalculator weightCalculator = weightCalculators.computeIfAbsent( - subscriptionName, - subscription -> createWeightCalculator(subscriptionProfiles, subscriptionName) - ); - weightCalculator.update(currentWeight); + Weight currentConsumerWeight = new Weight(entry.getValue().getOperationsPerSecond()); + Weight currentMaxWeight = currentWeights.computeIfAbsent(subscriptionName, subscription -> Weight.ZERO); + Weight newMaxWeight = Weight.max(currentMaxWeight, currentConsumerWeight); + currentWeights.put(subscriptionName, newMaxWeight); } } - return weightCalculators; - } - - private WeightCalculator createWeightCalculator(SubscriptionProfiles subscriptionProfiles, SubscriptionName subscriptionName) { - SubscriptionProfile subscriptionProfile = subscriptionProfiles.getProfile(subscriptionName); - return new WeightCalculator( - weightWindowSize, - subscriptionProfile != null ? subscriptionProfile.getWeight() : null, - subscriptionProfiles.getUpdateTimestamp() - ); - } - - @Override - public void onAfterBalancing(WorkDistributionChanges changes) { - for (SubscriptionName subscriptionName : changes.getRebalancedSubscriptions()) { - SubscriptionProfile profile = profiles.getProfile(subscriptionName); - if (profile != null) { - profiles.updateProfile(subscriptionName, new SubscriptionProfile(clock.instant(), profile.getWeight())); - } - } - subscriptionProfileRegistry.persist(profiles); - } - - @Override - public SubscriptionProfile get(SubscriptionName subscriptionName) { - return profiles.getProfileOrUndefined(subscriptionName); + return currentWeights; } - private static class WeightCalculator { - - private final Duration weightWindowSize; - private final Weight previousWeight; - private final Instant previousUpdateTimestamp; - private Weight currentWeight = Weight.ZERO; - - WeightCalculator(Duration weightWindowSize, Weight previousWeight, Instant previousUpdateTimestamp) { - this.previousWeight = previousWeight; - this.weightWindowSize = weightWindowSize; - this.previousUpdateTimestamp = previousUpdateTimestamp; - } - - void update(Weight weight) { - currentWeight = Weight.max(weight, currentWeight); - } - - Weight calculateExponentiallyWeightedMovingAverage(Instant now) { - if (previousWeight == null || previousUpdateTimestamp == null) { - return currentWeight; - } - // This calculation is done in the same way as the Linux load average is calculated. - // See: https://www.helpsystems.com/resources/guides/unix-load-average-part-1-how-it-works - Duration elapsed = Duration.between(previousUpdateTimestamp, now); - long elapsedMillis = Math.max(elapsed.toMillis(), 0); - double alpha = 1.0 - Math.exp(-1.0 * ((double) elapsedMillis / weightWindowSize.toMillis())); - return currentWeight.multiply(alpha) - .add(previousWeight.multiply(1.0 - alpha)); - } + private SubscriptionProfile applyCurrentWeight(SubscriptionProfiles previousProfiles, + SubscriptionName subscriptionName, + Weight currentWeight, + Instant now) { + SubscriptionProfile previousProfile = previousProfiles.getProfile(subscriptionName); + Weight previousWeight = previousProfile.getWeight(); + ExponentiallyWeightedMovingAverage average = new ExponentiallyWeightedMovingAverage(weightWindowSize); + average.update(previousWeight.getOperationsPerSecond(), previousProfiles.getUpdateTimestamp()); + double opsAvg = average.update(currentWeight.getOperationsPerSecond(), now); + return new SubscriptionProfile(previousProfile.getLastRebalanceTimestamp(), new Weight(opsAvg)); } } diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/workload/weighted/SubscriptionProfilesDecoder.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/workload/weighted/SubscriptionProfilesDecoder.java index 136e044241..dc7b7e5509 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/workload/weighted/SubscriptionProfilesDecoder.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/workload/weighted/SubscriptionProfilesDecoder.java @@ -68,7 +68,7 @@ private SubscriptionProfiles decodeSubscriptionProfiles(ProfilesDecoder body) { private Instant toInstant(long millis) { if (millis < 0) { - return null; + return Instant.MIN; } return Instant.ofEpochMilli(millis); } diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/workload/weighted/SubscriptionProfilesEncoder.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/workload/weighted/SubscriptionProfilesEncoder.java index 1bd88a782b..63dafb023b 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/workload/weighted/SubscriptionProfilesEncoder.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/workload/weighted/SubscriptionProfilesEncoder.java @@ -60,7 +60,7 @@ private Map mapToSubscriptionIds(Subscripti } private long toMillis(Instant timestamp) { - if (timestamp == null) { + if (timestamp == Instant.MIN) { return -1; } return timestamp.toEpochMilli(); diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/workload/weighted/TargetWeightCalculator.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/workload/weighted/TargetWeightCalculator.java new file mode 100644 index 0000000000..a8ea650fab --- /dev/null +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/workload/weighted/TargetWeightCalculator.java @@ -0,0 +1,9 @@ +package pl.allegro.tech.hermes.consumers.supervisor.workload.weighted; + +import java.util.Collection; +import java.util.Map; + +public interface TargetWeightCalculator { + + Map calculate(Collection consumers); +} diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/workload/weighted/WeightedWorkBalancer.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/workload/weighted/WeightedWorkBalancer.java index 5295aa1f79..8b5fda2343 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/workload/weighted/WeightedWorkBalancer.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/workload/weighted/WeightedWorkBalancer.java @@ -14,6 +14,7 @@ import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collection; +import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -39,16 +40,19 @@ public class WeightedWorkBalancer implements WorkBalancer { private final Clock clock; private final Duration stabilizationWindowSize; private final double minSignificantChangePercent; - private final SubscriptionProfileProvider subscriptionProfileProvider; + private final CurrentLoadProvider currentLoadProvider; + private final TargetWeightCalculator targetWeightCalculator; public WeightedWorkBalancer(Clock clock, Duration stabilizationWindowSize, double minSignificantChangePercent, - SubscriptionProfileProvider subscriptionProfileProvider) { + CurrentLoadProvider currentLoadProvider, + TargetWeightCalculator targetWeightCalculator) { this.clock = clock; this.stabilizationWindowSize = stabilizationWindowSize; this.minSignificantChangePercent = minSignificantChangePercent; - this.subscriptionProfileProvider = subscriptionProfileProvider; + this.currentLoadProvider = currentLoadProvider; + this.targetWeightCalculator = targetWeightCalculator; } @Override @@ -68,8 +72,9 @@ private AssignmentPlan restoreValidAssignments(List subscripti WorkloadConstraints constraints) { Map consumers = createConsumers(activeConsumerNodes, constraints); List unassignedTasks = new ArrayList<>(); + SubscriptionProfiles profiles = currentLoadProvider.getProfiles(); for (SubscriptionName subscriptionName : subscriptions) { - SubscriptionProfile subscriptionProfile = subscriptionProfileProvider.get(subscriptionName); + SubscriptionProfile subscriptionProfile = profiles.getProfile(subscriptionName); Queue consumerTasks = createConsumerTasks(subscriptionName, subscriptionProfile, constraints); Set consumerNodesForSubscription = currentState.getConsumerNodesForSubscription(subscriptionName); for (String consumerId : consumerNodesForSubscription) { @@ -86,7 +91,10 @@ private AssignmentPlan restoreValidAssignments(List subscripti private Map createConsumers(List activeConsumerNodes, WorkloadConstraints constraints) { return activeConsumerNodes.stream() - .map(consumerId -> new ConsumerNode(consumerId, constraints.getMaxSubscriptionsPerConsumer())) + .map(consumerId -> { + ConsumerNodeLoad consumerNodeLoad = currentLoadProvider.getConsumerNodeLoad(consumerId); + return new ConsumerNode(consumerId, consumerNodeLoad, constraints.getMaxSubscriptionsPerConsumer()); + }) .collect(toMap(ConsumerNode::getConsumerId, Function.identity())); } @@ -127,6 +135,7 @@ private AssignmentPlan rebalance(AssignmentPlan plan) { TargetConsumerLoad targetLoad = calculateTargetConsumerLoad(consumers); List overloadedConsumers = consumers.stream() .filter(consumerNode -> isOverloaded(consumerNode, targetLoad)) + .sorted(new MostOverloadedConsumerFirst(targetLoad)) .collect(toList()); for (ConsumerNode overloaded : overloadedConsumers) { List candidates = consumers.stream() @@ -147,24 +156,14 @@ private TargetConsumerLoad calculateTargetConsumerLoad(Collection int totalNumberOfTasks = consumers.stream().mapToInt(ConsumerNode::getAssignedTaskCount).sum(); int consumerCount = consumers.size(); int maxNumberOfTasksPerConsumer = consumerCount == 0 ? 0 : divideWithRoundingUp(totalNumberOfTasks, consumerCount); - Weight targetConsumerWeight = calculateTargetConsumerWeight(consumers); - return new TargetConsumerLoad(targetConsumerWeight, maxNumberOfTasksPerConsumer); + Map targetWeights = targetWeightCalculator.calculate(consumers); + return new TargetConsumerLoad(targetWeights, maxNumberOfTasksPerConsumer); } private int divideWithRoundingUp(int dividend, int divisor) { return (dividend / divisor) + (dividend % divisor > 0 ? 1 : 0); } - private Weight calculateTargetConsumerWeight(Collection consumers) { - if (consumers.isEmpty()) { - return Weight.ZERO; - } - Weight sum = consumers.stream() - .map(ConsumerNode::getWeight) - .reduce(Weight.ZERO, Weight::add); - return sum.divide(consumers.size()); - } - private void tryMoveOutTasks(ConsumerNode overloaded, ConsumerNode candidate, TargetConsumerLoad targetLoad) { List candidatesToMoveOut = overloaded.getAssignedTasks().stream() .filter(candidate::isNotAssigned) @@ -185,7 +184,8 @@ private boolean hasTooManyTasks(ConsumerNode consumerNode, TargetConsumerLoad ta } private boolean isProfitable(MoveOutProposal proposal, TargetConsumerLoad targetLoad) { - if (targetLoad.getWeight().isGreaterThanOrEqualTo(proposal.getFinalCandidateWeight())) { + Weight targetWeight = targetLoad.getWeightForConsumer(proposal.getCandidateId()); + if (targetWeight.isGreaterThanOrEqualTo(proposal.getFinalCandidateWeight())) { logger.debug("MoveOut proposal will be applied:\n{}", proposal); return true; } @@ -221,17 +221,18 @@ private List findTasksForMovingOut(ConsumerNode source, ConsumerNo } private boolean isStable(ConsumerTask task) { - return task.getLastRebalanceTimestamp() == null - || !task.getLastRebalanceTimestamp().plus(stabilizationWindowSize).isAfter(clock.instant()); + return clock.instant().isAfter(task.getLastRebalanceTimestamp().plus(stabilizationWindowSize)); } private boolean isOverloaded(ConsumerNode consumerNode, TargetConsumerLoad targetLoad) { - return consumerNode.getWeight().isGreaterThan(targetLoad.getWeight()) + Weight targetWeight = targetLoad.getWeightForConsumer(consumerNode.getConsumerId()); + return consumerNode.getWeight().isGreaterThan(targetWeight) || consumerNode.getAssignedTaskCount() > targetLoad.getNumberOfTasks(); } private boolean isBalanced(ConsumerNode consumerNode, TargetConsumerLoad targetLoad) { - return consumerNode.getWeight().isEqualTo(targetLoad.getWeight()) + Weight targetWeight = targetLoad.getWeightForConsumer(consumerNode.getConsumerId()); + return consumerNode.getWeight().isEqualTo(targetWeight) && consumerNode.getAssignedTaskCount() <= targetLoad.getNumberOfTasks(); } @@ -239,11 +240,13 @@ private boolean isProfitable(SwapProposal proposal, TargetConsumerLoad targetLoa Weight initialOverloadedWeight = proposal.getOverloadedWeight(); Weight finalOverloadedWeight = proposal.getFinalOverloadedWeight(); Weight finalCandidateWeight = proposal.getFinalCandidateWeight(); + Weight overloadedTargetWeight = targetLoad.getWeightForConsumer(proposal.getOverloadedId()); + Weight candidateTargetWeight = targetLoad.getWeightForConsumer(proposal.getCandidateId()); - if (initialOverloadedWeight.isLessThan(targetLoad.getWeight())) { + if (initialOverloadedWeight.isLessThan(overloadedTargetWeight)) { return false; } - if (finalCandidateWeight.isGreaterThan(targetLoad.getWeight())) { + if (finalCandidateWeight.isGreaterThan(candidateTargetWeight)) { return false; } if (finalOverloadedWeight.isGreaterThan(initialOverloadedWeight)) { @@ -311,6 +314,10 @@ Weight getFinalCandidateWeight() { return finalCandidateWeight; } + String getCandidateId() { + return candidate.getConsumerId(); + } + @Override public String toString() { return toString(overloaded, finalOverloadedWeight) + "\n" + toString(candidate, finalCandidateWeight); @@ -354,6 +361,14 @@ Weight getFinalCandidateWeight() { return finalCandidateWeight; } + String getCandidateId() { + return candidate.getConsumerId(); + } + + String getOverloadedId() { + return overloaded.getConsumerId(); + } + @Override public String toString() { return toString(overloaded, finalOverloadedWeight) + "\n" + toString(candidate, finalCandidateWeight); @@ -366,20 +381,38 @@ private String toString(ConsumerNode consumerNode, Weight newWeight) { private static class TargetConsumerLoad { - private final Weight weight; + private final Map weights; private final int numberOfTasks; - TargetConsumerLoad(Weight weight, int numberOfTasks) { - this.weight = weight; + TargetConsumerLoad(Map weights, int numberOfTasks) { + this.weights = weights; this.numberOfTasks = numberOfTasks; } - Weight getWeight() { - return weight; + Weight getWeightForConsumer(String consumerId) { + return weights.get(consumerId); } int getNumberOfTasks() { return numberOfTasks; } } + + private static class MostOverloadedConsumerFirst implements Comparator { + + private final TargetConsumerLoad targetLoad; + + MostOverloadedConsumerFirst(TargetConsumerLoad targetLoad) { + this.targetLoad = targetLoad; + } + + @Override + public int compare(ConsumerNode first, ConsumerNode second) { + Weight firstTargetLoad = targetLoad.getWeightForConsumer(first.getConsumerId()); + Weight secondTargetLoad = targetLoad.getWeightForConsumer(second.getConsumerId()); + Weight firstError = first.getWeight().subtract(firstTargetLoad); + Weight secondError = second.getWeight().subtract(secondTargetLoad); + return secondError.compareTo(firstError); + } + } } diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/workload/weighted/WeightedWorkBalancingListener.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/workload/weighted/WeightedWorkBalancingListener.java new file mode 100644 index 0000000000..0d345ba9d1 --- /dev/null +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/workload/weighted/WeightedWorkBalancingListener.java @@ -0,0 +1,85 @@ +package pl.allegro.tech.hermes.consumers.supervisor.workload.weighted; + +import pl.allegro.tech.hermes.api.SubscriptionName; +import pl.allegro.tech.hermes.consumers.supervisor.workload.BalancingListener; +import pl.allegro.tech.hermes.consumers.supervisor.workload.WorkDistributionChanges; + +import java.time.Clock; +import java.time.Duration; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Function; + +import static java.util.stream.Collectors.toMap; + +public class WeightedWorkBalancingListener implements BalancingListener { + + private final ConsumerNodeLoadRegistry consumerNodeLoadRegistry; + private final SubscriptionProfileRegistry subscriptionProfileRegistry; + private final CurrentLoadProvider currentLoadProvider; + private final WeightedWorkloadMetrics weightedWorkloadMetrics; + private final SubscriptionProfilesCalculator subscriptionProfilesCalculator; + private final Clock clock; + + public WeightedWorkBalancingListener(ConsumerNodeLoadRegistry consumerNodeLoadRegistry, + SubscriptionProfileRegistry subscriptionProfileRegistry, + CurrentLoadProvider currentLoadProvider, + WeightedWorkloadMetrics weightedWorkloadMetrics, + Clock clock, + Duration weightWindowSize) { + this.consumerNodeLoadRegistry = consumerNodeLoadRegistry; + this.subscriptionProfileRegistry = subscriptionProfileRegistry; + this.currentLoadProvider = currentLoadProvider; + this.weightedWorkloadMetrics = weightedWorkloadMetrics; + this.subscriptionProfilesCalculator = new SubscriptionProfilesCalculator(clock, weightWindowSize); + this.clock = clock; + } + + @Override + public void onBeforeBalancing(List activeConsumers) { + weightedWorkloadMetrics.unregisterMetricsForConsumersOtherThan(new HashSet<>(activeConsumers)); + Map newConsumerLoads = fetchConsumerNodeLoads(activeConsumers); + currentLoadProvider.updateConsumerNodeLoads(newConsumerLoads); + SubscriptionProfiles currentProfiles = recalculateSubscriptionProfiles(newConsumerLoads.values()); + currentLoadProvider.updateProfiles(currentProfiles); + } + + private Map fetchConsumerNodeLoads(List activeConsumers) { + return activeConsumers.stream() + .collect(toMap(Function.identity(), consumerNodeLoadRegistry::get)); + } + + private SubscriptionProfiles recalculateSubscriptionProfiles(Collection consumerNodeLoads) { + SubscriptionProfiles previousProfiles = subscriptionProfileRegistry.fetch(); + return subscriptionProfilesCalculator.calculate(consumerNodeLoads, previousProfiles); + } + + @Override + public void onAfterBalancing(WorkDistributionChanges changes) { + applyRebalanceTimestampToSubscriptionProfiles(changes.getRebalancedSubscriptions()); + } + + private void applyRebalanceTimestampToSubscriptionProfiles(Set rebalancedSubscriptions) { + SubscriptionProfiles currentProfiles = currentLoadProvider.getProfiles(); + Map profilePerSubscription = new HashMap<>(currentProfiles.getProfiles()); + for (SubscriptionName subscriptionName : rebalancedSubscriptions) { + SubscriptionProfile profile = profilePerSubscription.get(subscriptionName); + if (profile != null) { + profilePerSubscription.put(subscriptionName, new SubscriptionProfile(clock.instant(), profile.getWeight())); + } + } + SubscriptionProfiles finalProfiles = new SubscriptionProfiles(profilePerSubscription, currentProfiles.getUpdateTimestamp()); + subscriptionProfileRegistry.persist(finalProfiles); + currentLoadProvider.updateProfiles(finalProfiles); + } + + @Override + public void onBalancingSkipped() { + weightedWorkloadMetrics.unregisterLeaderMetrics(); + currentLoadProvider.clear(); + } +} diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/workload/weighted/WeightedWorkloadMetrics.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/workload/weighted/WeightedWorkloadMetrics.java new file mode 100644 index 0000000000..761e8bec7f --- /dev/null +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/workload/weighted/WeightedWorkloadMetrics.java @@ -0,0 +1,102 @@ +package pl.allegro.tech.hermes.consumers.supervisor.workload.weighted; + +import com.codahale.metrics.Gauge; +import pl.allegro.tech.hermes.common.metric.HermesMetrics; + +import java.util.Collection; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import static java.util.Collections.emptySet; +import static java.util.stream.Collectors.toSet; + +public class WeightedWorkloadMetrics { + + private static final String METRICS_PREFIX = "consumer-workload.weighted."; + private static final String CONSUMER_ID_PLACEHOLDER = "$consumerId"; + private static final String CURRENT_SCORE = METRICS_PREFIX + CONSUMER_ID_PLACEHOLDER + ".current-score"; + private static final String PROPOSED_SCORE = METRICS_PREFIX + CONSUMER_ID_PLACEHOLDER + ".proposed-score"; + private static final String SCORING_ERROR = METRICS_PREFIX + CONSUMER_ID_PLACEHOLDER + ".error"; + private static final String CURRENT_WEIGHT_OPS = METRICS_PREFIX + CONSUMER_ID_PLACEHOLDER + ".current-weight.ops"; + private static final String PROPOSED_WEIGHT_OPS = METRICS_PREFIX + CONSUMER_ID_PLACEHOLDER + ".proposed-weight.ops"; + + private final HermesMetrics metrics; + + private final Map currentWeights = new ConcurrentHashMap<>(); + private final Map proposedWeights = new ConcurrentHashMap<>(); + private final Map currentScores = new ConcurrentHashMap<>(); + private final Map proposedScores = new ConcurrentHashMap<>(); + private final Map scoringErrors = new ConcurrentHashMap<>(); + + public WeightedWorkloadMetrics(HermesMetrics metrics) { + this.metrics = metrics; + } + + void reportCurrentScore(String consumerId, double score) { + registerGaugeIfNeeded(currentScores, consumerId, CURRENT_SCORE); + currentScores.put(consumerId, score); + } + + void reportProposedScore(String consumerId, double score) { + registerGaugeIfNeeded(proposedScores, consumerId, PROPOSED_SCORE); + proposedScores.put(consumerId, score); + } + + void reportScoringError(String consumerId, double error) { + registerGaugeIfNeeded(scoringErrors, consumerId, SCORING_ERROR); + scoringErrors.put(consumerId, error); + } + + void reportCurrentWeights(Collection consumers) { + for (ConsumerNode consumerNode : consumers) { + String consumerId = consumerNode.getConsumerId(); + registerGaugeIfNeeded(currentWeights, consumerId, CURRENT_WEIGHT_OPS); + currentWeights.put(consumerId, consumerNode.getWeight().getOperationsPerSecond()); + } + } + + void reportProposedWeights(Map newWeights) { + for (Map.Entry entry : newWeights.entrySet()) { + String consumerId = entry.getKey(); + registerGaugeIfNeeded(proposedWeights, consumerId, PROPOSED_WEIGHT_OPS); + proposedWeights.put(consumerId, entry.getValue().getOperationsPerSecond()); + } + } + + private void registerGaugeIfNeeded(Map currentValues, String consumerId, String metric) { + if (!currentValues.containsKey(consumerId)) { + String metricPath = buildFullMetricPath(metric, consumerId); + metrics.registerGauge(metricPath, (Gauge) () -> currentValues.getOrDefault(consumerId, 0d)); + } + } + + void unregisterLeaderMetrics() { + unregisterMetricsForConsumersOtherThan(emptySet()); + } + + void unregisterMetricsForConsumersOtherThan(Set consumerIds) { + unregisterGaugesForConsumersOtherThan(currentWeights, consumerIds, CURRENT_WEIGHT_OPS); + unregisterGaugesForConsumersOtherThan(proposedWeights, consumerIds, PROPOSED_WEIGHT_OPS); + unregisterGaugesForConsumersOtherThan(currentScores, consumerIds, CURRENT_SCORE); + unregisterGaugesForConsumersOtherThan(proposedScores, consumerIds, PROPOSED_SCORE); + unregisterGaugesForConsumersOtherThan(scoringErrors, consumerIds, SCORING_ERROR); + } + + private void unregisterGaugesForConsumersOtherThan(Map currentValues, Set consumerIds, String metric) { + Set consumerIdsToRemove = currentValues.keySet().stream() + .filter(consumerId -> !consumerIds.contains(consumerId)) + .collect(toSet()); + for (String consumerId : consumerIdsToRemove) { + if (!consumerIds.contains(consumerId)) { + String metricPath = buildFullMetricPath(metric, consumerId); + metrics.unregisterGauge(metricPath); + currentValues.remove(consumerId); + } + } + } + + private String buildFullMetricPath(String metric, String consumerId) { + return metric.replace(CONSUMER_ID_PLACEHOLDER, HermesMetrics.escapeDots(consumerId)); + } +} diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/workload/weighted/ZookeeperConsumerNodeLoadRegistry.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/workload/weighted/ZookeeperConsumerNodeLoadRegistry.java index 4f1f5fce55..15ca3c6b5d 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/workload/weighted/ZookeeperConsumerNodeLoadRegistry.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/workload/weighted/ZookeeperConsumerNodeLoadRegistry.java @@ -1,6 +1,7 @@ package pl.allegro.tech.hermes.consumers.supervisor.workload.weighted; import com.codahale.metrics.Gauge; +import com.sun.management.OperatingSystemMXBean; import org.apache.curator.framework.CuratorFramework; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; @@ -13,6 +14,7 @@ import pl.allegro.tech.hermes.consumers.subscription.id.SubscriptionIds; import pl.allegro.tech.hermes.infrastructure.zookeeper.ZookeeperPaths; +import java.lang.management.ManagementFactory; import java.time.Clock; import java.time.Duration; import java.util.Map; @@ -41,10 +43,12 @@ public class ZookeeperConsumerNodeLoadRegistry implements ConsumerNodeLoadRegist private final ConsumerNodeLoadEncoder encoder; private final ConsumerNodeLoadDecoder decoder; private final ScheduledExecutorService executor; + private final OperatingSystemMXBean platformMXBean = ManagementFactory.getPlatformMXBean(OperatingSystemMXBean.class); private final Set subscriptionLoadRecorders = newSetFromMap(new ConcurrentHashMap<>()); - private volatile long lastReset = 0; + private volatile long lastReset; private volatile double currentOperationsPerSecond = 0d; + private volatile double cpuUtilization = -1; public ZookeeperConsumerNodeLoadRegistry(CuratorFramework curator, SubscriptionIds subscriptionIds, @@ -65,7 +69,12 @@ public ZookeeperConsumerNodeLoadRegistry(CuratorFramework curator, this.encoder = new ConsumerNodeLoadEncoder(subscriptionIds, consumerLoadEncoderBufferSizeBytes); this.decoder = new ConsumerNodeLoadDecoder(subscriptionIds); this.executor = executorServiceFactory.createSingleThreadScheduledExecutor("consumer-node-load-reporter-%d"); + this.lastReset = clock.millis(); metrics.registerGauge("consumer-workload.weighted.load.ops", (Gauge) () -> currentOperationsPerSecond); + metrics.registerGauge("consumer-workload.weighted.load.cpu-utilization", (Gauge) () -> cpuUtilization); + if (platformMXBean.getProcessCpuLoad() < 0d) { + logger.warn("Process CPU load is not available."); + } } @Override @@ -95,9 +104,10 @@ private ConsumerNodeLoad calculateConsumerNodeLoad() { long elapsedMillis = now - lastReset; long elapsedSeconds = Math.max(MILLISECONDS.toSeconds(elapsedMillis), 1); lastReset = now; + cpuUtilization = platformMXBean.getProcessCpuLoad(); Map loadPerSubscription = subscriptionLoadRecorders.stream() .collect(toMap(ZookeeperSubscriptionLoadRecorder::getSubscriptionName, recorder -> recorder.calculate(elapsedSeconds))); - return new ConsumerNodeLoad(loadPerSubscription); + return new ConsumerNodeLoad(cpuUtilization, loadPerSubscription); } private void persist(ConsumerNodeLoad metrics) throws Exception { diff --git a/hermes-consumers/src/main/resources/sbe/workload.xml b/hermes-consumers/src/main/resources/sbe/workload.xml index 89c8d7e71f..f75cd21185 100644 --- a/hermes-consumers/src/main/resources/sbe/workload.xml +++ b/hermes-consumers/src/main/resources/sbe/workload.xml @@ -13,9 +13,10 @@ - - - + + + + diff --git a/hermes-consumers/src/test/groovy/pl/allegro/tech/hermes/consumers/supervisor/workload/weighted/MockConsumerNodeLoadRegistry.groovy b/hermes-consumers/src/test/groovy/pl/allegro/tech/hermes/consumers/supervisor/workload/weighted/MockConsumerNodeLoadRegistry.groovy index ed08b7d1d0..e59193733b 100644 --- a/hermes-consumers/src/test/groovy/pl/allegro/tech/hermes/consumers/supervisor/workload/weighted/MockConsumerNodeLoadRegistry.groovy +++ b/hermes-consumers/src/test/groovy/pl/allegro/tech/hermes/consumers/supervisor/workload/weighted/MockConsumerNodeLoadRegistry.groovy @@ -29,7 +29,7 @@ class MockConsumerNodeLoadRegistry implements ConsumerNodeLoadRegistry { MockConsumerNodeLoadRegistry operationsPerSecond(SubscriptionName subscriptionName, Map opsPerConsumer) { opsPerConsumer.entrySet().each { - ConsumerNodeLoad consumerNodeLoad = loads.getOrDefault(it.key, new ConsumerNodeLoad([:])) + ConsumerNodeLoad consumerNodeLoad = loads.getOrDefault(it.key, new ConsumerNodeLoad(1d, [:])) consumerNodeLoad.loadPerSubscription.put(subscriptionName, new SubscriptionLoad(it.value)) loads.put(it.key, consumerNodeLoad) } diff --git a/hermes-consumers/src/test/groovy/pl/allegro/tech/hermes/consumers/supervisor/workload/weighted/MockSubscriptionProfileRegistry.groovy b/hermes-consumers/src/test/groovy/pl/allegro/tech/hermes/consumers/supervisor/workload/weighted/MockSubscriptionProfileRegistry.groovy index bd7c010b48..e49269f997 100644 --- a/hermes-consumers/src/test/groovy/pl/allegro/tech/hermes/consumers/supervisor/workload/weighted/MockSubscriptionProfileRegistry.groovy +++ b/hermes-consumers/src/test/groovy/pl/allegro/tech/hermes/consumers/supervisor/workload/weighted/MockSubscriptionProfileRegistry.groovy @@ -4,42 +4,33 @@ import pl.allegro.tech.hermes.api.SubscriptionName import java.time.Instant -class MockSubscriptionProfileRegistry implements SubscriptionProfileRegistry, SubscriptionProfileProvider { +class MockSubscriptionProfileRegistry implements SubscriptionProfileRegistry { - private Instant updateTimestamp - private final Map profiles = new HashMap<>() - - @Override - SubscriptionProfile get(SubscriptionName subscriptionName) { - return profiles.getOrDefault(subscriptionName, SubscriptionProfile.UNDEFINED) - } + private SubscriptionProfiles subscriptionProfiles = SubscriptionProfiles.EMPTY @Override SubscriptionProfiles fetch() { - return new SubscriptionProfiles(profiles, updateTimestamp) + return subscriptionProfiles } @Override - void persist(SubscriptionProfiles profiles) { - - } - - Set getSubscriptionNames() { - return profiles.keySet() + void persist(SubscriptionProfiles profilesToPersist) { + subscriptionProfiles = profilesToPersist } MockSubscriptionProfileRegistry updateTimestamp(Instant updateTimestamp) { - this.updateTimestamp = updateTimestamp + subscriptionProfiles = new SubscriptionProfiles(subscriptionProfiles.getProfiles(), updateTimestamp) return this } MockSubscriptionProfileRegistry profile(SubscriptionName subscriptionName, Instant lastRebalanceTimestamp, Weight weight) { + def profiles = new HashMap<>(subscriptionProfiles.getProfiles()) profiles.put(subscriptionName, new SubscriptionProfile(lastRebalanceTimestamp, weight)) + subscriptionProfiles = new SubscriptionProfiles(profiles, subscriptionProfiles.updateTimestamp) return this } void reset() { - profiles.clear() - updateTimestamp = null + subscriptionProfiles = SubscriptionProfiles.EMPTY } } diff --git a/hermes-consumers/src/test/groovy/pl/allegro/tech/hermes/consumers/supervisor/workload/weighted/ScoringTargetWeightCalculatorTest.groovy b/hermes-consumers/src/test/groovy/pl/allegro/tech/hermes/consumers/supervisor/workload/weighted/ScoringTargetWeightCalculatorTest.groovy new file mode 100644 index 0000000000..0fea2fa52c --- /dev/null +++ b/hermes-consumers/src/test/groovy/pl/allegro/tech/hermes/consumers/supervisor/workload/weighted/ScoringTargetWeightCalculatorTest.groovy @@ -0,0 +1,168 @@ +package pl.allegro.tech.hermes.consumers.supervisor.workload.weighted + +import com.codahale.metrics.MetricRegistry +import pl.allegro.tech.hermes.api.SubscriptionName +import pl.allegro.tech.hermes.common.metric.HermesMetrics +import pl.allegro.tech.hermes.metrics.PathsCompiler +import spock.lang.Specification +import spock.lang.Subject + +import java.time.Clock +import java.time.Duration +import java.time.Instant + +class ScoringTargetWeightCalculatorTest extends Specification { + + def hermesMetrics = new HermesMetrics(new MetricRegistry(), new PathsCompiler("host")) + def scoringGain = 1.0d + + @Subject + def calculator = new ScoringTargetWeightCalculator( + new WeightedWorkloadMetrics(hermesMetrics), + Clock.systemDefaultZone(), + Duration.ofMinutes(15), + scoringGain + ) + + def "should assign weights accordingly to consumer performance"() { + given: + def consumers = [ + consumerNode("c1", 0.5, new Weight(100)), + consumerNode("c2", 1.0, new Weight(50)), + consumerNode("c3", 0.6, new Weight(50)) + ] + + when: + def weights = calculator.calculate(consumers) + + then: + // target CPU utilization = (0.5 + 1.0 + 0.6) / 3 = 0.7 + // CPU utilization errors: + // c1 = 0.7 - 0.5 = 0.2 + // c2 = 0.7 - 1.0 = -0.3 + // c3 = 0.7 - 0.6 = 0.1 + // + // total weight = 100 + 50 + 50 = 200 + // current scores: + // c1 = 100 / 200 = 0.5 + // c2 = 50 / 200 = 0.25 + // c3 = 50 / 200 = 0.25 + // + // new scores (current score + scoringGain * error) + // c1 = 0.5 + 1.0 * 0.2 = 0.7 + // c2 = 0.25 + 1.0 * (-0.3) = 0.01 (min score) + // c3 = 0.25 + 1.0 * 0.1 = 0.35 + // new scores sum = 0.7 + 0.01 + 0.35 = 1.06 + // + // new weights (total weight * (new score / new scores sum)): + // c1 = 200 * 0.7 / 1.06 ~ 132 + // c2 = 200 * 0.01 / 1.06 ~ 1.88 + // c3 = 200 * 0.35 / 1.06 ~ 66 + weights == [ + "c1": new Weight(132.07547169811323), + "c2": new Weight(1.8867924528301887), + "c3": new Weight(66.03773584905662) + ] + } + + def "should not change correct weights"() { + given: + def consumers = [ + consumerNode("c1", 0.5, new Weight(100)), + consumerNode("c2", 0.5, new Weight(70)), + consumerNode("c3", 0.5, new Weight(90)) + ] + + when: + def weights = calculator.calculate(consumers) + + then: + weights == [ + "c1": new Weight(100), + "c2": new Weight(70), + "c3": new Weight(90) + ] + } + + def "should use weight average when consumers have undefined load"() { + given: + def consumers = [ + consumerNode("c1", 0.7, new Weight(180)), + consumerNode("c2", 0.4, new Weight(60)), + consumerNodeWithUndefinedLoad("c3") + ] + + when: + def weights = calculator.calculate(consumers) + + then: + weights == [ + "c1": new Weight(96.00000000000003), + "c2": new Weight(64.00000000000001), + "c3": new Weight(80.0) + ] + } + + def "should use weight average when consumers have subscriptions assigned and undefined CPU utilization"() { + given: + def consumers = [ + consumerNode("c1", ConsumerNodeLoad.UNDEFINED.cpuUtilization, new Weight(100)), + consumerNode("c2", ConsumerNodeLoad.UNDEFINED.cpuUtilization, new Weight(60)), + consumerNode("c3", ConsumerNodeLoad.UNDEFINED.cpuUtilization, new Weight(80)), + ] + + when: + def weights = calculator.calculate(consumers) + + then: + weights == [ + "c1": new Weight(80), + "c2": new Weight(80), + "c3": new Weight(80) + ] + } + + def "should assign weight zero to all consumers when there are no subscriptions"() { + given: + def consumers = [ + consumerNodeWithUndefinedLoad("c1"), + consumerNodeWithUndefinedLoad("c2"), + consumerNodeWithUndefinedLoad("c3") + ] + + when: + def weights = calculator.calculate(consumers) + + then: + weights == [ + "c1": Weight.ZERO, + "c2": Weight.ZERO, + "c3": Weight.ZERO + ] + } + + def "should return empty map when consumer list is empty"() { + given: + def consumers = [] + + when: + def weights = calculator.calculate(consumers) + + then: + weights == [:] + } + + private static ConsumerNode consumerNode(String consumerId, double cpu, Weight weight) { + def subscription = SubscriptionName.fromString("pl.allegro.tech.hermes\$sub1") + def subscriptions = Map.of( + subscription, new SubscriptionLoad(weight.getOperationsPerSecond()) + ) + ConsumerNode consumerNode = new ConsumerNode(consumerId, new ConsumerNodeLoad(cpu, subscriptions), 3) + consumerNode.assign(new ConsumerTask(subscription, new SubscriptionProfile(Instant.now(), weight))) + return consumerNode + } + + private static ConsumerNode consumerNodeWithUndefinedLoad(String consumerId) { + return new ConsumerNode(consumerId, ConsumerNodeLoad.UNDEFINED, 3) + } +} diff --git a/hermes-consumers/src/test/groovy/pl/allegro/tech/hermes/consumers/supervisor/workload/weighted/SubscriptionProfilesBuilder.groovy b/hermes-consumers/src/test/groovy/pl/allegro/tech/hermes/consumers/supervisor/workload/weighted/SubscriptionProfilesBuilder.groovy new file mode 100644 index 0000000000..29f44defb6 --- /dev/null +++ b/hermes-consumers/src/test/groovy/pl/allegro/tech/hermes/consumers/supervisor/workload/weighted/SubscriptionProfilesBuilder.groovy @@ -0,0 +1,25 @@ +package pl.allegro.tech.hermes.consumers.supervisor.workload.weighted + +import pl.allegro.tech.hermes.api.SubscriptionName + +import java.time.Instant + +class SubscriptionProfilesBuilder { + + private final Map profiles = new HashMap<>() + private Instant rebalanceTimestamp + + SubscriptionProfilesBuilder withRebalanceTimestamp(Instant rebalanceTimestamp) { + this.rebalanceTimestamp = rebalanceTimestamp + return this + } + + SubscriptionProfilesBuilder withProfile(SubscriptionName subscriptionName, Weight weight) { + profiles.put(subscriptionName, new SubscriptionProfile(rebalanceTimestamp, weight)) + return this + } + + SubscriptionProfiles build() { + return new SubscriptionProfiles(profiles, Instant.MIN) + } +} diff --git a/hermes-consumers/src/test/groovy/pl/allegro/tech/hermes/consumers/supervisor/workload/weighted/WeightedWorkBalancerTest.groovy b/hermes-consumers/src/test/groovy/pl/allegro/tech/hermes/consumers/supervisor/workload/weighted/WeightedWorkBalancerTest.groovy index 3c8bc33ca9..4af44c4073 100644 --- a/hermes-consumers/src/test/groovy/pl/allegro/tech/hermes/consumers/supervisor/workload/weighted/WeightedWorkBalancerTest.groovy +++ b/hermes-consumers/src/test/groovy/pl/allegro/tech/hermes/consumers/supervisor/workload/weighted/WeightedWorkBalancerTest.groovy @@ -1,13 +1,18 @@ package pl.allegro.tech.hermes.consumers.supervisor.workload.weighted +import com.codahale.metrics.MetricRegistry import pl.allegro.tech.hermes.api.Constraints import pl.allegro.tech.hermes.api.SubscriptionName import pl.allegro.tech.hermes.api.TopicName +import pl.allegro.tech.hermes.common.metric.HermesMetrics import pl.allegro.tech.hermes.consumers.supervisor.workload.SubscriptionAssignmentViewBuilder import pl.allegro.tech.hermes.consumers.supervisor.workload.WorkloadConstraints +import pl.allegro.tech.hermes.metrics.PathsCompiler import pl.allegro.tech.hermes.test.helper.time.ModifiableClock import spock.lang.Specification +import java.time.Duration + import static java.time.Duration.ofHours import static pl.allegro.tech.hermes.consumers.supervisor.workload.weighted.WeightedWorkloadAssertions.assertThat @@ -17,14 +22,15 @@ class WeightedWorkBalancerTest extends Specification { def "should balance taking into account subscription weight"() { given: - def previousRebalanceTimestamp = clock.instant() - def subscriptionProfileRegistry = new MockSubscriptionProfileRegistry() - .profile(subscription("sub1"), previousRebalanceTimestamp, new Weight(500d)) - .profile(subscription("sub2"), previousRebalanceTimestamp, new Weight(500d)) - .profile(subscription("sub3"), previousRebalanceTimestamp, new Weight(10d)) - .profile(subscription("sub4"), previousRebalanceTimestamp, new Weight(10d)) + def subscriptionProfiles = new SubscriptionProfilesBuilder() + .withRebalanceTimestamp(clock.instant()) + .withProfile(subscription("sub1"), new Weight(500d)) + .withProfile(subscription("sub2"), new Weight(500d)) + .withProfile(subscription("sub3"), new Weight(10d)) + .withProfile(subscription("sub4"), new Weight(10d)) + .build() def stabilizationWindow = ofHours(1) - def balancer = new WeightedWorkBalancer(clock, stabilizationWindow, 0d, subscriptionProfileRegistry) + def balancer = createWeightedWorkBalancer(stabilizationWindow, 0d, subscriptionProfiles) def initial = new SubscriptionAssignmentViewBuilder() .withAssignment(subscription("sub1"), "c1", "c2") .withAssignment(subscription("sub2"), "c1", "c2") @@ -57,14 +63,15 @@ class WeightedWorkBalancerTest extends Specification { def "should start by swapping the heaviest subscriptions"() { given: - def previousRebalanceTimestamp = clock.instant() - def subscriptionProfileRegistry = new MockSubscriptionProfileRegistry() - .profile(subscription("sub1"), previousRebalanceTimestamp, new Weight(3d)) - .profile(subscription("sub2"), previousRebalanceTimestamp, new Weight(2d)) - .profile(subscription("sub3"), previousRebalanceTimestamp, new Weight(1d)) - .profile(subscription("sub4"), previousRebalanceTimestamp, new Weight(0d)) + def subscriptionProfiles = new SubscriptionProfilesBuilder() + .withRebalanceTimestamp(clock.instant()) + .withProfile(subscription("sub1"), new Weight(3d)) + .withProfile(subscription("sub2"), new Weight(2d)) + .withProfile(subscription("sub3"), new Weight(1d)) + .withProfile(subscription("sub4"), new Weight(0d)) + .build() def stabilizationWindow = ofHours(1) - def balancer = new WeightedWorkBalancer(clock, stabilizationWindow, 0d, subscriptionProfileRegistry) + def balancer = createWeightedWorkBalancer(stabilizationWindow, 0d, subscriptionProfiles) def initial = new SubscriptionAssignmentViewBuilder() .withAssignment(subscription("sub1"), "c1") .withAssignment(subscription("sub2"), "c1") @@ -102,17 +109,18 @@ class WeightedWorkBalancerTest extends Specification { def "should not transfer subscriptions from lighter to heavier consumer through swapping"() { given: - def previousRebalanceTimestamp = clock.instant() - def subscriptionProfileRegistry = new MockSubscriptionProfileRegistry() - .profile(subscription("sub1"), previousRebalanceTimestamp, new Weight(3d)) - .profile(subscription("sub2"), previousRebalanceTimestamp, new Weight(0d)) - .profile(subscription("sub3"), previousRebalanceTimestamp, new Weight(10d)) - .profile(subscription("sub4"), previousRebalanceTimestamp, new Weight(0.1d)) - .profile(subscription("sub5"), previousRebalanceTimestamp, new Weight(0.1d)) - .profile(subscription("sub6"), previousRebalanceTimestamp, new Weight(0.1d)) - .profile(subscription("sub7"), previousRebalanceTimestamp, new Weight(0.1d)) + def subscriptionProfiles = new SubscriptionProfilesBuilder() + .withRebalanceTimestamp(clock.instant()) + .withProfile(subscription("sub1"), new Weight(3d)) + .withProfile(subscription("sub2"), new Weight(0d)) + .withProfile(subscription("sub3"), new Weight(10d)) + .withProfile(subscription("sub4"), new Weight(0.1d)) + .withProfile(subscription("sub5"), new Weight(0.1d)) + .withProfile(subscription("sub6"), new Weight(0.1d)) + .withProfile(subscription("sub7"), new Weight(0.1d)) + .build() def stabilizationWindow = ofHours(1) - def balancer = new WeightedWorkBalancer(clock, stabilizationWindow, 0.1d, subscriptionProfileRegistry) + def balancer = createWeightedWorkBalancer(stabilizationWindow, 0.1d, subscriptionProfiles) def initial = new SubscriptionAssignmentViewBuilder() .withAssignment(subscription("sub1"), "c1") .withAssignment(subscription("sub2"), "c1") @@ -150,14 +158,15 @@ class WeightedWorkBalancerTest extends Specification { def "should handle weight equal to 0"() { given: - def previousRebalanceTimestamp = clock.instant() - def subscriptionProfileRegistry = new MockSubscriptionProfileRegistry() - .profile(subscription("sub1"), previousRebalanceTimestamp, Weight.ZERO) - .profile(subscription("sub2"), previousRebalanceTimestamp, Weight.ZERO) - .profile(subscription("sub3"), previousRebalanceTimestamp, Weight.ZERO) - .profile(subscription("sub4"), previousRebalanceTimestamp, Weight.ZERO) + def subscriptionProfiles = new SubscriptionProfilesBuilder() + .withRebalanceTimestamp(clock.instant()) + .withProfile(subscription("sub1"), Weight.ZERO) + .withProfile(subscription("sub2"), Weight.ZERO) + .withProfile(subscription("sub3"), Weight.ZERO) + .withProfile(subscription("sub4"), Weight.ZERO) + .build() def stabilizationWindow = ofHours(1) - def balancer = new WeightedWorkBalancer(clock, stabilizationWindow, 0d, subscriptionProfileRegistry) + def balancer = createWeightedWorkBalancer(stabilizationWindow, 0d, subscriptionProfiles) def initial = new SubscriptionAssignmentViewBuilder() .withAssignment(subscription("sub1"), "c1", "c2") .withAssignment(subscription("sub2"), "c1", "c2") @@ -186,14 +195,15 @@ class WeightedWorkBalancerTest extends Specification { def "should assign unassigned subscriptions to least loaded consumers"() { given: - def previousRebalanceTimestamp = clock.instant() - def subscriptionProfileRegistry = new MockSubscriptionProfileRegistry() - .profile(subscription("sub1"), previousRebalanceTimestamp, new Weight(500d)) - .profile(subscription("sub2"), previousRebalanceTimestamp, new Weight(500d)) - .profile(subscription("sub3"), previousRebalanceTimestamp, new Weight(10d)) - .profile(subscription("sub4"), previousRebalanceTimestamp, new Weight(10d)) - .profile(subscription("sub5"), previousRebalanceTimestamp, new Weight(490d)) - def balancer = new WeightedWorkBalancer(clock, ofHours(1), 0d, subscriptionProfileRegistry) + def subscriptionProfiles = new SubscriptionProfilesBuilder() + .withRebalanceTimestamp(clock.instant()) + .withProfile(subscription("sub1"), new Weight(500d)) + .withProfile(subscription("sub2"), new Weight(500d)) + .withProfile(subscription("sub3"), new Weight(10d)) + .withProfile(subscription("sub4"), new Weight(10d)) + .withProfile(subscription("sub5"), new Weight(490d)) + .build() + def balancer = createWeightedWorkBalancer(ofHours(1), 0d, subscriptionProfiles) def initial = new SubscriptionAssignmentViewBuilder() .withAssignment(subscription("sub1"), "c3") .withAssignment(subscription("sub2"), "c2") @@ -206,7 +216,7 @@ class WeightedWorkBalancerTest extends Specification { .build() when: - def balanced = balancer.balance(subscriptionProfileRegistry.subscriptionNames as List, initial.consumerNodes as List, initial, constraints) + def balanced = balancer.balance(subscriptionProfiles.subscriptions as List, initial.consumerNodes as List, initial, constraints) then: // c1 = sub3+sub5 = 10+490 = 500 @@ -222,13 +232,14 @@ class WeightedWorkBalancerTest extends Specification { def "should not overload underloaded node while rebalancing"() { given: - def previousRebalanceTimestamp = clock.instant() - def subscriptionProfileRegistry = new MockSubscriptionProfileRegistry() - .profile(subscription("sub1"), previousRebalanceTimestamp, new Weight(500d)) - .profile(subscription("sub2"), previousRebalanceTimestamp, new Weight(500d)) - .profile(subscription("sub3"), previousRebalanceTimestamp, new Weight(10d)) + def subscriptionProfiles = new SubscriptionProfilesBuilder() + .withRebalanceTimestamp(clock.instant()) + .withProfile(subscription("sub1"), new Weight(500d)) + .withProfile(subscription("sub2"), new Weight(500d)) + .withProfile(subscription("sub3"), new Weight(10d)) + .build() def stabilizationWindow = ofHours(1) - def balancer = new WeightedWorkBalancer(clock, stabilizationWindow, 0d, subscriptionProfileRegistry) + def balancer = createWeightedWorkBalancer(stabilizationWindow, 0d, subscriptionProfiles) def initial = new SubscriptionAssignmentViewBuilder() .withAssignment(subscription("sub1"), "c1") .withAssignment(subscription("sub2"), "c2") @@ -244,7 +255,7 @@ class WeightedWorkBalancerTest extends Specification { clock.advance(stabilizationWindow.plusMinutes(1)) when: - def balanced = balancer.balance(subscriptionProfileRegistry.subscriptionNames as List, initial.consumerNodes as List, initial, constraints) + def balanced = balancer.balance(subscriptionProfiles.subscriptions as List, initial.consumerNodes as List, initial, constraints) then: assertThat(balanced) @@ -255,15 +266,16 @@ class WeightedWorkBalancerTest extends Specification { def "should not rebalance when change is insignificant"() { given: - def previousRebalanceTimestamp = clock.instant() - def subscriptionProfileRegistry = new MockSubscriptionProfileRegistry() - .profile(subscription("sub1"), previousRebalanceTimestamp, new Weight(5d)) - .profile(subscription("sub2"), previousRebalanceTimestamp, new Weight(500d)) - .profile(subscription("sub3"), previousRebalanceTimestamp, new Weight(10d)) - .profile(subscription("sub4"), previousRebalanceTimestamp, new Weight(10d)) + def subscriptionProfiles = new SubscriptionProfilesBuilder() + .withRebalanceTimestamp(clock.instant()) + .withProfile(subscription("sub1"), new Weight(5d)) + .withProfile(subscription("sub2"), new Weight(500d)) + .withProfile(subscription("sub3"), new Weight(10d)) + .withProfile(subscription("sub4"), new Weight(10d)) + .build() def stabilizationWindow = ofHours(1) def minSignificantChangePercent = 10.0d - def balancer = new WeightedWorkBalancer(clock, stabilizationWindow, minSignificantChangePercent, subscriptionProfileRegistry) + def balancer = createWeightedWorkBalancer(stabilizationWindow, minSignificantChangePercent, subscriptionProfiles) def initial = new SubscriptionAssignmentViewBuilder() .withAssignment(subscription("sub1"), "c1", "c2") .withAssignment(subscription("sub2"), "c3", "c4") @@ -303,14 +315,15 @@ class WeightedWorkBalancerTest extends Specification { def "should not rebalance if the stabilization window has not elapsed yet"() { given: - def previousRebalanceTimestamp = clock.instant() - def subscriptionProfileRegistry = new MockSubscriptionProfileRegistry() - .profile(subscription("sub1"), previousRebalanceTimestamp, new Weight(500d)) - .profile(subscription("sub2"), previousRebalanceTimestamp, new Weight(500d)) - .profile(subscription("sub3"), previousRebalanceTimestamp, new Weight(10d)) - .profile(subscription("sub4"), previousRebalanceTimestamp, new Weight(10d)) + def subscriptionProfiles = new SubscriptionProfilesBuilder() + .withRebalanceTimestamp(clock.instant()) + .withProfile(subscription("sub1"), new Weight(500d)) + .withProfile(subscription("sub2"), new Weight(500d)) + .withProfile(subscription("sub3"), new Weight(10d)) + .withProfile(subscription("sub4"), new Weight(10d)) + .build() def stabilizationWindow = ofHours(1) - def balancer = new WeightedWorkBalancer(clock, stabilizationWindow, 0d, subscriptionProfileRegistry) + def balancer = createWeightedWorkBalancer(stabilizationWindow, 0d, subscriptionProfiles) def initial = new SubscriptionAssignmentViewBuilder() .withAssignment(subscription("sub1"), "c1", "c2") .withAssignment(subscription("sub2"), "c1", "c2") @@ -339,14 +352,15 @@ class WeightedWorkBalancerTest extends Specification { def "should rebalance if the stabilization window has elapsed"() { given: - def previousRebalanceTimestamp = clock.instant() - def subscriptionProfileRegistry = new MockSubscriptionProfileRegistry() - .profile(subscription("sub1"), previousRebalanceTimestamp, new Weight(500d)) - .profile(subscription("sub2"), previousRebalanceTimestamp, new Weight(500d)) - .profile(subscription("sub3"), previousRebalanceTimestamp, new Weight(10d)) - .profile(subscription("sub4"), previousRebalanceTimestamp, new Weight(10d)) + def subscriptionProfiles = new SubscriptionProfilesBuilder() + .withRebalanceTimestamp(clock.instant()) + .withProfile(subscription("sub1"), new Weight(500d)) + .withProfile(subscription("sub2"), new Weight(500d)) + .withProfile(subscription("sub3"), new Weight(10d)) + .withProfile(subscription("sub4"), new Weight(10d)) + .build() def stabilizationWindow = ofHours(1) - def balancer = new WeightedWorkBalancer(clock, stabilizationWindow, 0d, subscriptionProfileRegistry) + def balancer = createWeightedWorkBalancer(stabilizationWindow, 0d, subscriptionProfiles) def initial = new SubscriptionAssignmentViewBuilder() .withAssignment(subscription("sub1"), "c1", "c2") .withAssignment(subscription("sub2"), "c1", "c2") @@ -375,7 +389,7 @@ class WeightedWorkBalancerTest extends Specification { def "should not change assignments when rebalance is not needed"() { given: - def balancer = new WeightedWorkBalancer(clock, ofHours(1), 0d, new MockSubscriptionProfileRegistry()) + def balancer = createWeightedWorkBalancer(ofHours(1), 0d, SubscriptionProfiles.EMPTY) def initial = new SubscriptionAssignmentViewBuilder() .withAssignment(subscription("sub1"), "c1", "c2") .withAssignment(subscription("sub2"), "c2", "c3") @@ -396,7 +410,7 @@ class WeightedWorkBalancerTest extends Specification { def "should remove an inactive subscriptions"() { given: - def balancer = new WeightedWorkBalancer(clock, ofHours(1), 0d, new MockSubscriptionProfileRegistry()) + def balancer = createWeightedWorkBalancer(ofHours(1), 0d, SubscriptionProfiles.EMPTY) def initial = new SubscriptionAssignmentViewBuilder() .withAssignment(subscription("sub1"), "c1", "c2") .withAssignment(subscription("sub2"), "c2", "c3") @@ -421,7 +435,7 @@ class WeightedWorkBalancerTest extends Specification { def "should remove an inactive consumer"() { given: - def balancer = new WeightedWorkBalancer(clock, ofHours(1), 0d, new MockSubscriptionProfileRegistry()) + def balancer = createWeightedWorkBalancer(ofHours(1), 0d, SubscriptionProfiles.EMPTY) def initial = new SubscriptionAssignmentViewBuilder() .withAssignment(subscription("sub1"), "c1", "c2") .withAssignment(subscription("sub2"), "c2", "c3") @@ -447,7 +461,7 @@ class WeightedWorkBalancerTest extends Specification { def "should assign subscriptions to a new consumer node"() { given: - def balancer = new WeightedWorkBalancer(clock, ofHours(1), 0d, new MockSubscriptionProfileRegistry()) + def balancer = createWeightedWorkBalancer(ofHours(1), 0d, SubscriptionProfiles.EMPTY) def initial = new SubscriptionAssignmentViewBuilder() .withAssignment(subscription("sub1"), "c1") .withAssignment(subscription("sub2"), "c2") @@ -475,7 +489,7 @@ class WeightedWorkBalancerTest extends Specification { def "should respect limit of subscriptions per consumer"() { given: - def balancer = new WeightedWorkBalancer(clock, ofHours(1), 0d, new MockSubscriptionProfileRegistry()) + def balancer = createWeightedWorkBalancer(ofHours(1), 0d, SubscriptionProfiles.EMPTY) def initial = new SubscriptionAssignmentViewBuilder() .withAssignment(subscription("sub1"), "c1", "c2") .withAssignment(subscription("sub2"), "c1", "c2") @@ -500,7 +514,7 @@ class WeightedWorkBalancerTest extends Specification { def "should respect workload constraints"() { given: - def balancer = new WeightedWorkBalancer(clock, ofHours(1), 0d, new MockSubscriptionProfileRegistry()) + def balancer = createWeightedWorkBalancer(ofHours(1), 0d, SubscriptionProfiles.EMPTY) def topic = TopicName.fromQualifiedName("pl.allegro.tech.hermes") def initial = new SubscriptionAssignmentViewBuilder() .withAssignment(subscription("sub1", topic), "c1", "c2") @@ -529,7 +543,7 @@ class WeightedWorkBalancerTest extends Specification { def "should balance work for one consumer node"() { given: - def balancer = new WeightedWorkBalancer(clock, ofHours(1), 0d, new MockSubscriptionProfileRegistry()) + def balancer = createWeightedWorkBalancer(ofHours(1), 0d, SubscriptionProfiles.EMPTY) def initial = new SubscriptionAssignmentViewBuilder().build() def constraints = WorkloadConstraints.builder() .withConsumersPerSubscription(2) @@ -550,7 +564,7 @@ class WeightedWorkBalancerTest extends Specification { def "should balance one subscription"() { given: - def balancer = new WeightedWorkBalancer(clock, ofHours(1), 0d, new MockSubscriptionProfileRegistry()) + def balancer = createWeightedWorkBalancer(ofHours(1), 0d, SubscriptionProfiles.EMPTY) def initial = new SubscriptionAssignmentViewBuilder().build() def constraints = WorkloadConstraints.builder() .withConsumersPerSubscription(2) @@ -570,7 +584,7 @@ class WeightedWorkBalancerTest extends Specification { def "should not create assignments when there are no consumers"() { given: - def balancer = new WeightedWorkBalancer(clock, ofHours(1), 0d, new MockSubscriptionProfileRegistry()) + def balancer = createWeightedWorkBalancer(ofHours(1), 0d, SubscriptionProfiles.EMPTY) def initial = new SubscriptionAssignmentViewBuilder().build() def constraints = WorkloadConstraints.builder() .withConsumersPerSubscription(2) @@ -589,7 +603,7 @@ class WeightedWorkBalancerTest extends Specification { def "should not create assignments when there are no subscriptions"() { given: - def balancer = new WeightedWorkBalancer(clock, ofHours(1), 0d, new MockSubscriptionProfileRegistry()) + def balancer = createWeightedWorkBalancer(ofHours(1), 0d, SubscriptionProfiles.EMPTY) def initial = new SubscriptionAssignmentViewBuilder().build() def constraints = WorkloadConstraints.builder() .withConsumersPerSubscription(2) @@ -613,4 +627,20 @@ class WeightedWorkBalancerTest extends Specification { private static SubscriptionName subscription(String name, TopicName topicName) { return new SubscriptionName(name, topicName) } + + private WeightedWorkBalancer createWeightedWorkBalancer(Duration stabilizationWindowSize, + double minSignificantChangePercent, + SubscriptionProfiles subscriptionProfiles) { + CurrentLoadProvider currentLoadProvider = new CurrentLoadProvider() + currentLoadProvider.updateProfiles(subscriptionProfiles) + HermesMetrics hermesMetrics = new HermesMetrics(new MetricRegistry(), new PathsCompiler("host")) + WeightedWorkloadMetrics workloadMetrics = new WeightedWorkloadMetrics(hermesMetrics) + return new WeightedWorkBalancer( + clock, + stabilizationWindowSize, + minSignificantChangePercent, + currentLoadProvider, + new AvgTargetWeightCalculator(workloadMetrics) + ) + } } diff --git a/hermes-consumers/src/test/groovy/pl/allegro/tech/hermes/consumers/supervisor/workload/weighted/SubscriptionProfilesCalculatorTest.groovy b/hermes-consumers/src/test/groovy/pl/allegro/tech/hermes/consumers/supervisor/workload/weighted/WeightedWorkBalancingListenerTest.groovy similarity index 56% rename from hermes-consumers/src/test/groovy/pl/allegro/tech/hermes/consumers/supervisor/workload/weighted/SubscriptionProfilesCalculatorTest.groovy rename to hermes-consumers/src/test/groovy/pl/allegro/tech/hermes/consumers/supervisor/workload/weighted/WeightedWorkBalancingListenerTest.groovy index e30c44e79d..f1575e7cb8 100644 --- a/hermes-consumers/src/test/groovy/pl/allegro/tech/hermes/consumers/supervisor/workload/weighted/SubscriptionProfilesCalculatorTest.groovy +++ b/hermes-consumers/src/test/groovy/pl/allegro/tech/hermes/consumers/supervisor/workload/weighted/WeightedWorkBalancingListenerTest.groovy @@ -1,7 +1,11 @@ package pl.allegro.tech.hermes.consumers.supervisor.workload.weighted +import com.codahale.metrics.MetricFilter +import com.codahale.metrics.MetricRegistry import pl.allegro.tech.hermes.api.SubscriptionName +import pl.allegro.tech.hermes.common.metric.HermesMetrics import pl.allegro.tech.hermes.consumers.supervisor.workload.WorkDistributionChanges +import pl.allegro.tech.hermes.metrics.PathsCompiler import pl.allegro.tech.hermes.test.helper.time.ModifiableClock import spock.lang.Specification import spock.lang.Subject @@ -10,18 +14,23 @@ import java.time.Duration import static pl.allegro.tech.hermes.consumers.supervisor.workload.weighted.WeightedWorkloadAssertions.assertThat -class SubscriptionProfilesCalculatorTest extends Specification { +class WeightedWorkBalancingListenerTest extends Specification { def clock = new ModifiableClock() def workDistributionChanges = Mock(WorkDistributionChanges) def consumerNodeLoadRegistry = new MockConsumerNodeLoadRegistry() def subscriptionProfileRegistry = new MockSubscriptionProfileRegistry() def weightWindowSize = Duration.ofMinutes(1) + def currentLoadProvider = new CurrentLoadProvider() + def metricsRegistry = new MetricRegistry() + def metrics = new WeightedWorkloadMetrics(new HermesMetrics(metricsRegistry, new PathsCompiler("host"))) @Subject - def calculator = new SubscriptionProfilesCalculator( + def listener = new WeightedWorkBalancingListener( consumerNodeLoadRegistry, subscriptionProfileRegistry, + currentLoadProvider, + metrics, clock, weightWindowSize ) @@ -29,7 +38,7 @@ class SubscriptionProfilesCalculatorTest extends Specification { def cleanup() { consumerNodeLoadRegistry.reset() subscriptionProfileRegistry.reset() - calculator.onBeforeBalancing([]) + currentLoadProvider.clear() } def "should take the maximal value of operations per second as the subscription weight"() { @@ -40,15 +49,12 @@ class SubscriptionProfilesCalculatorTest extends Specification { .operationsPerSecond(subscription("sub3"), ["c3": 10d, "c4": 10d]) when: - calculator.onBeforeBalancing(["c1", "c2", "c3"]) + listener.onBeforeBalancing(["c1", "c2", "c3"]) then: - assertThat(calculator.get(subscription("sub1"))) - .hasWeight(new Weight(500d)) - assertThat(calculator.get(subscription("sub2"))) - .hasWeight(new Weight(500d)) - assertThat(calculator.get(subscription("sub3"))) - .hasWeight(new Weight(10d)) + assertSubscriptionWeight(subscription("sub1"), new Weight(500d)) + assertSubscriptionWeight(subscription("sub2"), new Weight(500d)) + assertSubscriptionWeight(subscription("sub3"), new Weight(10d)) } def "should calculate weight using exponentially weighted moving average"() { @@ -62,21 +68,19 @@ class SubscriptionProfilesCalculatorTest extends Specification { when: clock.advance(weightWindowSize.minusSeconds(30)) - calculator.onBeforeBalancing(["c1", "c2"]) + listener.onBeforeBalancing(["c1", "c2"]) then: - assertThat(calculator.get(subscription("sub1"))) - .hasWeight(new Weight(80.32653298563167d)) + assertSubscriptionWeight(subscription("sub1"), new Weight(80.32653298563167d)) when: workDistributionChanges.getRebalancedSubscriptions() >> [] - calculator.onAfterBalancing(workDistributionChanges) + listener.onAfterBalancing(workDistributionChanges) clock.advance(weightWindowSize.minusSeconds(30)) - calculator.onBeforeBalancing(["c1", "c2"]) + listener.onBeforeBalancing(["c1", "c2"]) then: - assertThat(calculator.get(subscription("sub1"))) - .hasWeight(new Weight(68.39397205857212d)) + assertSubscriptionWeight(subscription("sub1"), new Weight(68.39397205857212d)) } def "should return previous weight when the new timestamp is before the previous one"() { @@ -90,11 +94,10 @@ class SubscriptionProfilesCalculatorTest extends Specification { when: clock.advanceMinutes(-1) - calculator.onBeforeBalancing(["c1", "c2"]) + listener.onBeforeBalancing(["c1", "c2"]) then: - assertThat(calculator.get(subscription("sub1"))) - .hasWeight(new Weight(100d)) + assertSubscriptionWeight(subscription("sub1"), new Weight(100d)) } def "should take 0 as the default weight"() { @@ -109,15 +112,26 @@ class SubscriptionProfilesCalculatorTest extends Specification { when: clock.advance(weightWindowSize.minusSeconds(30)) - calculator.onBeforeBalancing(["c1", "c2"]) + listener.onBeforeBalancing(["c1", "c2"]) then: - assertThat(calculator.get(subscription("sub1"))) - .hasWeight(new Weight(80.32653298563167d)) - assertThat(calculator.get(subscription("sub2"))) - .hasWeight(Weight.ZERO) - assertThat(calculator.get(subscription("sub3"))) - .hasWeight(Weight.ZERO) + assertSubscriptionWeight(subscription("sub1"), new Weight(80.32653298563167d)) + assertSubscriptionWeight(subscription("sub2"), Weight.ZERO) + assertSubscriptionWeight(subscription("sub3"), Weight.ZERO) + } + + def "should take value of operations per second reported by consumers as initial weight"() { + given: + subscriptionProfileRegistry + .persist(SubscriptionProfiles.EMPTY) + consumerNodeLoadRegistry + .operationsPerSecond(subscription("sub1"), ["c1": 50d, "c2": 50d]) + + when: + listener.onBeforeBalancing(["c1", "c2"]) + + then: + assertSubscriptionWeight(subscription("sub1"), new Weight(50d)) } def "should update rebalance timestamp"() { @@ -135,19 +149,50 @@ class SubscriptionProfilesCalculatorTest extends Specification { clock.advanceMinutes(1) and: - calculator.onBeforeBalancing(["c1", "c2", "c3"]) + listener.onBeforeBalancing(["c1", "c2", "c3"]) when: - calculator.onAfterBalancing(workDistributionChanges) + listener.onAfterBalancing(workDistributionChanges) then: - assertThat(calculator.get(subscription("sub1"))) + def profiles = subscriptionProfileRegistry.fetch() + assertThat(profiles.getProfile(subscription("sub1"))) .hasLastRebalanceTimestamp(clock.instant()) - assertThat(calculator.get(subscription("sub2"))) + assertThat(profiles.getProfile(subscription("sub2"))) .hasLastRebalanceTimestamp(previousRebalanceTimestamp) } + def "should unregister workload metrics for inactive consumers"() { + given: + metrics.reportCurrentScore("c1", 0.5d) + metrics.reportCurrentScore("c2", 1.5d) + + when: + listener.onBeforeBalancing(["c2"]) + + then: + metricsRegistry.getGauges(MetricFilter.contains(".c2.")).size() == 1 + metricsRegistry.getGauges(MetricFilter.contains(".c1.")).size() == 0 + } + + def "should unregister workload metrics when the consumer is no longer a leader"() { + given: + metrics.reportCurrentScore("c1", 0.5d) + metrics.reportCurrentScore("c2", 1.5d) + + when: + listener.onBalancingSkipped() + + then: + metricsRegistry.getGauges().size() == 0 + } + private static SubscriptionName subscription(String name) { return SubscriptionName.fromString("pl.allegro.tech.hermes\$$name") } + + private void assertSubscriptionWeight(SubscriptionName subscriptionName, Weight weight) { + def profiles = currentLoadProvider.getProfiles() + assertThat(profiles.getProfile(subscriptionName)).hasWeight(weight) + } } diff --git a/hermes-consumers/src/test/java/pl/allegro/tech/hermes/consumers/supervisor/workload/weighted/ZookeeperSubscriptionProfileRegistryTest.java b/hermes-consumers/src/test/java/pl/allegro/tech/hermes/consumers/supervisor/workload/weighted/ZookeeperSubscriptionProfileRegistryTest.java index b6fff8071b..107d5cc0fe 100644 --- a/hermes-consumers/src/test/java/pl/allegro/tech/hermes/consumers/supervisor/workload/weighted/ZookeeperSubscriptionProfileRegistryTest.java +++ b/hermes-consumers/src/test/java/pl/allegro/tech/hermes/consumers/supervisor/workload/weighted/ZookeeperSubscriptionProfileRegistryTest.java @@ -9,9 +9,11 @@ import pl.allegro.tech.hermes.test.helper.zookeeper.ZookeeperBaseTest; import java.time.Instant; +import java.util.Comparator; import java.util.List; import java.util.Map; +import static java.time.temporal.ChronoUnit.MILLIS; import static org.assertj.core.api.Assertions.assertThat; public class ZookeeperSubscriptionProfileRegistryTest extends ZookeeperBaseTest { @@ -35,7 +37,7 @@ public void shouldPersistAndReadSubscriptionProfiles() { SubscriptionProfiles profiles = new SubscriptionProfiles( Map.of( firstSubscription, new SubscriptionProfile(Instant.now(), new Weight(100d)), - secondSubscription, new SubscriptionProfile(Instant.now(), Weight.ZERO) + secondSubscription, SubscriptionProfile.UNDEFINED ), Instant.now() ); @@ -45,6 +47,41 @@ secondSubscription, new SubscriptionProfile(Instant.now(), Weight.ZERO) // then SubscriptionProfiles readProfiles = registry.fetch(); - assertThat(readProfiles).isEqualTo(profiles); + assertThatProfilesAreEqual(readProfiles, profiles); + } + + @Test + public void shouldPersistAndReadEmptySubscriptionProfiles() { + // given + ZookeeperSubscriptionProfileRegistry registry = new ZookeeperSubscriptionProfileRegistry( + zookeeperClient, + new TestSubscriptionIds(List.of()), + new ZookeeperPaths("/hermes"), + "kafka-cluster", + 100_000 + ); + SubscriptionProfiles profiles = SubscriptionProfiles.EMPTY; + + // when + registry.persist(profiles); + + // then + SubscriptionProfiles readProfiles = registry.fetch(); + assertThatProfilesAreEqual(readProfiles, profiles); + } + + private static void assertThatProfilesAreEqual(SubscriptionProfiles actual, SubscriptionProfiles expected) { + assertThat(actual) + .usingRecursiveComparison() + .withComparatorForType(new InstantComparatorIgnoringNanos(), Instant.class) + .isEqualTo(expected); + } + + private static class InstantComparatorIgnoringNanos implements Comparator { + + @Override + public int compare(Instant o1, Instant o2) { + return o1.truncatedTo(MILLIS).compareTo(o2.truncatedTo(MILLIS)); + } } }