Skip to content

Commit

Permalink
Fix getConsumerKeyHashRanges
Browse files Browse the repository at this point in the history
  • Loading branch information
lhotari committed Sep 20, 2024
1 parent 0055a24 commit ac7fdeb
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,12 @@ public ConsistentHashingStickyKeyConsumerSelector(int numberOfPoints) {
* It is not changed unless a consumer is removed or a colliding consumer with higher priority is added.
*/
private static class HashRingEntry {
private final List<Consumer> consumers;
// This class is used to store the consumer and the order in which it was added to the hash ring
// sorting will be by priority, consumer name and the order in which it was added
record ConsumerEntry(Consumer consumer, int addOrder) {
}

private final List<ConsumerEntry> consumers;
private final int hash;
Consumer selectedConsumer;

Expand All @@ -70,22 +75,24 @@ public HashRingEntry(int hash) {
}

public void addConsumer(Consumer consumer) {
consumers.add(consumer);
selectConsumer();
consumers.add(new ConsumerEntry(consumer, consumers.size()));
selectConsumer(true);
}

public void removeConsumer(Consumer consumer) {
consumers.remove(consumer);
selectConsumer();
consumers.removeIf(consumerEntry -> consumerEntry.consumer().equals(consumer));
selectConsumer(false);
}

public Consumer getSelectedConsumer() {
return selectedConsumer;
}

private void selectConsumer() {
private void selectConsumer(boolean performSorting) {
if (consumers.size() > 1) {
sortConsumersByPriorityLevelAndName(consumers);
if (performSorting) {
sortConsumersByPriorityLevelAndName(consumers);
}
List<Consumer> priorityConsumers = pickPriorityConsumers(consumers);
if (priorityConsumers.size() > 1) {
// use the hash to select a consumer from the priority consumers
Expand All @@ -104,24 +111,27 @@ private void selectConsumer() {
selectedConsumer = priorityConsumers.get(0);
}
} else if (consumers.size() == 1) {
selectedConsumer = consumers.get(0);
selectedConsumer = consumers.get(0).consumer();
} else {
selectedConsumer = null;
}
}

private static List<Consumer> pickPriorityConsumers(List<Consumer> consumers) {
Consumer firstConsumer = consumers.get(0);
private static List<Consumer> pickPriorityConsumers(List<ConsumerEntry> consumers) {
Consumer firstConsumer = consumers.get(0).consumer();
List<Consumer> priorityConsumers = consumers.stream()
.takeWhile(c -> c.getPriorityLevel() == firstConsumer.getPriorityLevel()
&& c.consumerName().equals(firstConsumer.consumerName()))
.takeWhile(c -> c.consumer().getPriorityLevel() == firstConsumer.getPriorityLevel()
&& c.consumer().consumerName().equals(firstConsumer.consumerName()))
.map(ConsumerEntry::consumer)
.toList();
return priorityConsumers;
}

private static void sortConsumersByPriorityLevelAndName(List<Consumer> consumers) {
consumers.sort(Comparator.comparing(Consumer::getPriorityLevel).reversed()
.thenComparing(Consumer::consumerName));
private static void sortConsumersByPriorityLevelAndName(List<ConsumerEntry> consumers) {
consumers.sort(Comparator.<ConsumerEntry, Integer>
comparing(entry -> entry.consumer().getPriorityLevel()).reversed()
.thenComparing(entry -> entry.consumer().consumerName())
.thenComparing(ConsumerEntry::addOrder));
}
}

Expand Down Expand Up @@ -191,12 +201,24 @@ public Map<Consumer, List<Range>> getConsumerKeyHashRanges() {
Map<Consumer, List<Range>> result = new LinkedHashMap<>();
rwLock.readLock().lock();
try {
if (hashRing.isEmpty()) {
return result;
}
int start = 0;
int lastKey = 0;
for (Map.Entry<Integer, HashRingEntry> entry: hashRing.entrySet()) {
Consumer consumer = entry.getValue().getSelectedConsumer();
result.computeIfAbsent(consumer, key -> new ArrayList<>())
.add(Range.of(start, entry.getKey()));
start = entry.getKey() + 1;
lastKey = entry.getKey();
start = lastKey + 1;
}
// Handle wrap-around
HashRingEntry firstHashRingEntry = hashRing.firstEntry().getValue();
Consumer firstSelectedConsumer = firstHashRingEntry.getSelectedConsumer();
List<Range> ranges = result.get(firstSelectedConsumer);
if (lastKey != Integer.MAX_VALUE - 1) {
ranges.add(Range.of(lastKey + 1, Integer.MAX_VALUE - 1));
}
} finally {
rwLock.readLock().unlock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.service;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.util.ArrayList;
Expand Down Expand Up @@ -148,9 +149,15 @@ public void testGetConsumerKeyHashRanges() throws BrokerServiceException.Consume
for (String s : consumerName) {
Consumer consumer = mock(Consumer.class);
when(consumer.consumerName()).thenReturn(s);
when(consumer.getPriorityLevel()).thenReturn(0);
when(consumer.toString()).thenReturn(s);
selector.addConsumer(consumer);
consumers.add(consumer);
}

assertThat(selector.getConsumerKeyHashRanges())
.containsExactlyEntriesOf(selector.getConsumerKeyHashRanges());

Map<Consumer, List<Range>> expectedResult = new HashMap<>();
expectedResult.put(consumers.get(0), Arrays.asList(
Range.of(119056335, 242013991),
Expand All @@ -159,17 +166,13 @@ public void testGetConsumerKeyHashRanges() throws BrokerServiceException.Consume
expectedResult.put(consumers.get(1), Arrays.asList(
Range.of(0, 90164503),
Range.of(90164504, 119056334),
Range.of(382436668, 722195656)));
Range.of(382436668, 722195656),
Range.of(1914695767, 2147483646)));
expectedResult.put(consumers.get(2), Arrays.asList(
Range.of(242013992, 242377547),
Range.of(242377548, 382436667),
Range.of(1656011843, 1707482097)));
for (Map.Entry<Consumer, List<Range>> entry : selector.getConsumerKeyHashRanges().entrySet()) {
System.out.println(entry.getValue());
Assert.assertEquals(entry.getValue(), expectedResult.get(entry.getKey()));
expectedResult.remove(entry.getKey());
}
Assert.assertEquals(expectedResult.size(), 0);
assertThat(selector.getConsumerKeyHashRanges()).containsExactlyInAnyOrderEntriesOf(expectedResult);
}

// reproduces https://github.com/apache/pulsar/issues/22050
Expand Down

0 comments on commit ac7fdeb

Please sign in to comment.