Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][broker] Add limits for Key_Shared Subscription look ahead in dispatching #23231

Merged
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
61 commits
Select commit Hold shift + click to select a range
d565b92
Refactor dispatching filtering logic
lhotari Aug 26, 2024
a5bade9
Limit Key_Shared look ahead
lhotari Aug 27, 2024
ff2db40
Move KeySharedSubscriptionTest out of flaky test group
lhotari Aug 27, 2024
25e6858
Simplify method
lhotari Aug 27, 2024
8e4328f
More code comments
lhotari Aug 27, 2024
b577134
Fix handling of permits
lhotari Aug 27, 2024
b9dad9f
Fix assertions
lhotari Aug 27, 2024
749d699
Improve readability
lhotari Aug 27, 2024
d1f13a8
Remove permits check when entries have already been read
lhotari Aug 27, 2024
a263ad4
Rename configuration property that uses msgInReplay concept
lhotari Aug 28, 2024
5e8d5b5
Update pulsar-broker/src/main/java/org/apache/pulsar/broker/service/p…
lhotari Aug 30, 2024
01bac5d
Take available permits into account in dispatching
lhotari Aug 30, 2024
b4ca8e8
Add basic initial filtering
lhotari Aug 30, 2024
88a05d2
Update field name in javadoc
lhotari Sep 2, 2024
dd1eca5
Permits are for entries/batches, not individual messages
lhotari Sep 2, 2024
b61da7e
Remove message level handling related to permits since permits are at…
lhotari Sep 2, 2024
d344a47
Remove DispatcherDiscardFilter from PersistentDispatcherMultipleConsu…
lhotari Sep 2, 2024
cfbd935
Use getAvailablePermits method to check if consumer has permits
lhotari Sep 2, 2024
8a4b4c4
Add isKeySharedLookAheadEnabledWhenRecentlyJoinedConsumersExist handl…
lhotari Sep 2, 2024
1cf36e6
Rename "hasConsumersNeededNormalRead" to "isNormalReadAllowed"
lhotari Sep 2, 2024
c60d29f
Currently closing consumers shouldn't have permits for dispatching
lhotari Sep 2, 2024
3b6eb0d
Add utility methods for logging topic stats and internal stats in tests
lhotari Sep 2, 2024
9a572d2
Log topic stats in tests for troubleshooting
lhotari Sep 2, 2024
648112f
Fix getAvailablePermits calculation for key_shared
lhotari Sep 2, 2024
898e715
Update comment about permits
lhotari Sep 2, 2024
abfef07
Drop unnecessary strict rule for stopping dispatching to a specific c…
lhotari Sep 2, 2024
bd8655c
Simplify filtering logic
lhotari Sep 2, 2024
d51701c
Remove the unnecessary DispatcherDiscardFilter
lhotari Sep 2, 2024
0e21ef2
Merge remote-tracking branch 'origin/master' into lh-limit-keyshared-…
lhotari Sep 2, 2024
115a109
skip consumers that have recently closed
lhotari Sep 2, 2024
dcb55da
fix permit calculations also for Shared subscriptions
lhotari Sep 2, 2024
e66a018
Use different method signature
lhotari Sep 2, 2024
a3355bf
Add mocking for consumer.cnx().isActive()
lhotari Sep 3, 2024
88256d6
Address review comment concern about performance regression
lhotari Sep 6, 2024
e7e4c83
Optimize further
lhotari Sep 6, 2024
423c58e
Use hasRecentlyJoinedConsumers method
lhotari Sep 6, 2024
e629ce9
cache maxLastSentPosition for replays
lhotari Sep 6, 2024
7a4c34e
Refactor the solution: always look ahead when it would be useful
lhotari Sep 6, 2024
e4792f3
Enforce "look ahead" limit for Key_Shared
lhotari Sep 6, 2024
6d5b114
Check permits earlier in the logic
lhotari Sep 6, 2024
950dee4
Fix look ahead logic in replay mode
lhotari Sep 6, 2024
0d469ec
Fix null handling
lhotari Sep 6, 2024
910da2c
Revisit logic to make as much progress as possible
lhotari Sep 6, 2024
209b69e
Fix limit calculation logic and fix test that relied on previous logic
lhotari Sep 6, 2024
a48a3fb
Improve effective limit logic, add test
lhotari Sep 6, 2024
f78f192
Rename method for more clarity
lhotari Sep 6, 2024
2d3e0d5
Update javadoc for isNormalReadAllowed
lhotari Sep 6, 2024
ca47e33
Update isNormalReadAllowed in super class
lhotari Sep 6, 2024
bf89d96
Fix formula mentioned in doc (max->min)
lhotari Sep 7, 2024
e937a4e
Fix IntelliJ warning
lhotari Sep 7, 2024
4c777af
Implement ConcurrentBitmapSortedLongPairSet.first
lhotari Sep 10, 2024
9532f87
Merge remote-tracking branch 'origin/master' into lh-limit-keyshared-…
lhotari Sep 11, 2024
0cdd5f7
Add testing utilities
lhotari Sep 13, 2024
02b18f6
Merge remote-tracking branch 'origin/master' into HEAD
lhotari Sep 18, 2024
b0d9fa0
Add test for read limit and fix the detected issue
lhotari Sep 13, 2024
7570136
Reset read ahead limits to defaults after test method
lhotari Sep 18, 2024
8804bd0
Speed up tests using receiveAndAckMessages by consuming in multiple t…
lhotari Sep 18, 2024
ba596a9
Increase the default read ahead limit to 2000/50000
lhotari Sep 18, 2024
48ff1af
Follow the advice of setting to <= 2 * managedLedgerMaxUnackedRangesT…
lhotari Sep 18, 2024
57eaa77
Use max unacked messages limits when look ahead limits are disabled
lhotari Sep 18, 2024
c9e72c4
Fix flaky test
lhotari Sep 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,32 @@ maxUnackedMessagesPerBroker=0
# limit/2 messages
maxUnackedMessagesPerSubscriptionOnBrokerBlocked=0.16

# For Key_Shared subscriptions, if messages cannot be dispatched to consumers due to a slow consumer
# or a blocked key hash (because of ordering constraints), the broker will continue reading more
# messages from the backlog and attempt to dispatch them to consumers until the number of replay
# messages reaches the calculated threshold.
# Formula: threshold = max(keySharedLookAheadMsgInReplayThresholdPerConsumer *
lhotari marked this conversation as resolved.
Show resolved Hide resolved
# connected consumer count, keySharedLookAheadMsgInReplayThresholdPerSubscription).
# Setting this value to 0 will disable the limit calculated per consumer.
keySharedLookAheadMsgInReplayThresholdPerConsumer=1000

# For Key_Shared subscriptions, if messages cannot be dispatched to consumers due to a slow consumer
# or a blocked key hash (because of ordering constraints), the broker will continue reading more
# messages from the backlog and attempt to dispatch them to consumers until the number of replay
# messages reaches the calculated threshold.
# Formula: threshold = max(keySharedLookAheadMsgInReplayThresholdPerConsumer *
# connected consumer count, keySharedLookAheadMsgInReplayThresholdPerSubscription).
# This value should be set to a value less than 2 * managedLedgerMaxUnackedRangesToPersist.
# Setting this value to 0 will disable the limit calculated per subscription.
keySharedLookAheadMsgInReplayThresholdPerSubscription=10000

# For Key_Shared subscriptions, if messages cannot be dispatched to consumers due to a slow consumer
# or a blocked key hash (because of ordering constraints), the broker will continue reading more
# messages from the backlog and attempt to dispatch them to consumers until the number of replay
# messages reaches the calculated threshold.
# This setting controls whether look ahead is enabled when recently joined consumers are present.
keySharedLookAheadEnabledWhenRecentlyJoinedConsumersExist=false

# Broker periodically checks if subscription is stuck and unblock if flag is enabled. (Default is disabled)
unblockStuckSubscriptionEnabled=false

Expand Down
26 changes: 26 additions & 0 deletions conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,32 @@ maxUnackedMessagesPerBroker=0
# limit/2 messages
maxUnackedMessagesPerSubscriptionOnBrokerBlocked=0.16

# For Key_Shared subscriptions, if messages cannot be dispatched to consumers due to a slow consumer
# or a blocked key hash (because of ordering constraints), the broker will continue reading more
# messages from the backlog and attempt to dispatch them to consumers until the number of replay
# messages reaches the calculated threshold.
# Formula: threshold = max(keySharedLookAheadMsgInReplayThresholdPerConsumer *
# connected consumer count, keySharedLookAheadMsgInReplayThresholdPerSubscription).
# Setting this value to 0 will disable the limit calculated per consumer.
keySharedLookAheadMsgInReplayThresholdPerConsumer=1000

# For Key_Shared subscriptions, if messages cannot be dispatched to consumers due to a slow consumer
# or a blocked key hash (because of ordering constraints), the broker will continue reading more
# messages from the backlog and attempt to dispatch them to consumers until the number of replay
# messages reaches the calculated threshold.
# Formula: threshold = max(keySharedLookAheadMsgInReplayThresholdPerConsumer *
# connected consumer count, keySharedLookAheadMsgInReplayThresholdPerSubscription).
# This value should be set to a value less than 2 * managedLedgerMaxUnackedRangesToPersist.
# Setting this value to 0 will disable the limit calculated per subscription.
keySharedLookAheadMsgInReplayThresholdPerSubscription=10000

# For Key_Shared subscriptions, if messages cannot be dispatched to consumers due to a slow consumer
# or a blocked key hash (because of ordering constraints), the broker will continue reading more
# messages from the backlog and attempt to dispatch them to consumers until the number of replay
# messages reaches the calculated threshold.
# This setting controls whether look ahead is enabled when recently joined consumers are present.
keySharedLookAheadEnabledWhenRecentlyJoinedConsumersExist=false

# Tick time to schedule task that checks topic publish rate limiting across all topics
# Reducing to lower value can give more accuracy while throttling publish but
# it uses more CPU to perform frequent check. (Disable publish throttling with value 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -949,6 +949,48 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece
+ " back and unack count reaches to `limit/2`. Using a value of 0, is disabling unackedMessage-limit"
+ " check and broker doesn't block dispatchers")
private int maxUnackedMessagesPerBroker = 0;

@FieldContext(
category = CATEGORY_POLICIES,
doc = "For Key_Shared subscriptions, if messages cannot be dispatched to consumers due to a slow consumer"
+ " or a blocked key hash (because of ordering constraints), the broker will continue reading more"
+ " messages from the backlog and attempt to dispatch them to consumers until the number of replay"
+ " messages reaches the calculated threshold.\n"
+ "Formula: threshold = max(keySharedLookAheadMsgInReplayThresholdPerConsumer *"
+ " connected consumer count, keySharedLookAheadMsgInReplayThresholdPerSubscription)"
+ ".\n"
+ "Setting this value to 0 will disable the limit calculated per consumer.",
dynamic = true
)
private int keySharedLookAheadMsgInReplayThresholdPerConsumer = 1000;

@FieldContext(
category = CATEGORY_POLICIES,
doc = "For Key_Shared subscriptions, if messages cannot be dispatched to consumers due to a slow consumer"
+ " or a blocked key hash (because of ordering constraints), the broker will continue reading more"
+ " messages from the backlog and attempt to dispatch them to consumers until the number of replay"
+ " messages reaches the calculated threshold.\n"
+ "Formula: threshold = max(keySharedLookAheadMsgInReplayThresholdPerConsumer *"
+ " connected consumer count, keySharedLookAheadMsgInReplayThresholdPerSubscription)"
+ ".\n"
+ "This value should be set to a value less than 2 * managedLedgerMaxUnackedRangesToPersist.\n"
+ "Setting this value to 0 will disable the limit calculated per subscription.\n",
dynamic = true
)
private int keySharedLookAheadMsgInReplayThresholdPerSubscription = 10000;


@FieldContext(
category = CATEGORY_POLICIES,
doc = "For Key_Shared subscriptions, if messages cannot be dispatched to consumers due to a slow consumer"
+ " or a blocked key hash (because of ordering constraints), the broker will continue reading more"
+ " messages from the backlog and attempt to dispatch them to consumers until the number of replay"
+ " messages reaches the calculated threshold.\n"
+ "This setting controls whether look ahead is enabled when recently joined consumers are present.",
dynamic = true
)
private boolean keySharedLookAheadEnabledWhenRecentlyJoinedConsumersExist = false;
lhotari marked this conversation as resolved.
Show resolved Hide resolved

@FieldContext(
category = CATEGORY_POLICIES,
doc = "Once broker reaches maxUnackedMessagesPerBroker limit, it blocks subscriptions which has higher "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@
import java.util.ArrayList;
import java.util.List;
import java.util.NavigableSet;
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.function.Predicate;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.PositionFactory;
Expand Down Expand Up @@ -146,8 +149,33 @@ public boolean containsStickyKeyHashes(Set<Integer> stickyKeyHashes) {
return false;
}

public NavigableSet<Position> getMessagesToReplayNow(int maxMessagesToRead) {
return messagesToRedeliver.items(maxMessagesToRead, PositionFactory::create);
public boolean containsStickyKeyHash(int stickyKeyHash) {
return !allowOutOfOrderDelivery && hashesRefCount.containsKey(stickyKeyHash);
}

public Optional<Position> getFirstPositionInReplay() {
NavigableSet<Position> items = messagesToRedeliver.items(1, PositionFactory::create);
return items.isEmpty() ? Optional.empty() : Optional.of(items.first());
}

/**
* Get the messages to replay now.
*
* @param maxMessagesToRead
* the max messages to read
* @param filter
* the filter to use to select the messages to replay
* @return the messages to replay now
*/
public NavigableSet<Position> getMessagesToReplayNow(int maxMessagesToRead, Predicate<Position> filter) {
NavigableSet<Position> items = new TreeSet<>();
messagesToRedeliver.processItems(PositionFactory::create, item -> {
if (filter.test(item)) {
items.add(item);
}
return items.size() < maxMessagesToRead;
});
return items;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,26 +345,24 @@ public synchronized void readMoreEntries() {
return;
}

NavigableSet<Position> messagesToReplayNow = getMessagesToReplayNow(messagesToRead);
NavigableSet<Position> messagesToReplayFiltered = filterOutEntriesWillBeDiscarded(messagesToReplayNow);
if (!messagesToReplayFiltered.isEmpty()) {
Set<Position> messagesToReplayNow =
canReplayMessages() ? getMessagesToReplayNow(messagesToRead) : Collections.emptySet();
if (!messagesToReplayNow.isEmpty()) {
equanz marked this conversation as resolved.
Show resolved Hide resolved
if (log.isDebugEnabled()) {
log.debug("[{}] Schedule replay of {} messages for {} consumers", name,
messagesToReplayFiltered.size(), consumerList.size());
messagesToReplayNow.size(), consumerList.size());
}

havePendingReplayRead = true;
minReplayedPosition = messagesToReplayNow.first();
updateMinReplayedPosition();
Set<? extends Position> deletedMessages = topic.isDelayedDeliveryEnabled()
? asyncReplayEntriesInOrder(messagesToReplayFiltered)
: asyncReplayEntries(messagesToReplayFiltered);
? asyncReplayEntriesInOrder(messagesToReplayNow)
: asyncReplayEntries(messagesToReplayNow);
// clear already acked positions from replay bucket

deletedMessages.forEach(position -> redeliveryMessages.remove(position.getLedgerId(),
position.getEntryId()));
// if all the entries are acked-entries and cleared up from redeliveryMessages, try to read
// next entries as readCompletedEntries-callback was never called
if ((messagesToReplayFiltered.size() - deletedMessages.size()) == 0) {
if ((messagesToReplayNow.size() - deletedMessages.size()) == 0) {
havePendingReplayRead = false;
readMoreEntriesAsync();
}
Expand All @@ -386,13 +384,7 @@ public synchronized void readMoreEntries() {
consumerList.size());
}
havePendingRead = true;
NavigableSet<Position> toReplay = getMessagesToReplayNow(1);
if (!toReplay.isEmpty()) {
minReplayedPosition = toReplay.first();
redeliveryMessages.add(minReplayedPosition.getLedgerId(), minReplayedPosition.getEntryId());
equanz marked this conversation as resolved.
Show resolved Hide resolved
} else {
minReplayedPosition = null;
}
updateMinReplayedPosition();

// Filter out and skip read delayed messages exist in DelayedDeliveryTracker
if (delayedDeliveryTracker.isPresent()) {
Expand Down Expand Up @@ -427,6 +419,19 @@ public synchronized void readMoreEntries() {
}
}

/**
* Controls whether replaying entries is currently enabled.
* Subclasses can override this method to temporarily disable replaying entries.
* @return true if replaying entries is currently enabled
*/
protected boolean canReplayMessages() {
return true;
}

private void updateMinReplayedPosition() {
minReplayedPosition = getFirstPositionInReplay().orElse(null);
}

private boolean shouldPauseOnAckStatePersist(ReadType readType) {
// Allows new consumers to consume redelivered messages caused by the just-closed consumer.
if (readType != ReadType.Normal) {
Expand Down Expand Up @@ -751,8 +756,9 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis
int remainingMessages = 0;
boolean hasChunk = false;
for (int i = 0; i < metadataArray.length; i++) {
final MessageMetadata metadata = Commands.peekAndCopyMessageMetadata(
entries.get(i).getDataBuffer(), subscription.toString(), -1);
Entry entry = entries.get(i);
MessageMetadata metadata = entry instanceof EntryAndMetadata ? ((EntryAndMetadata) entry).getMetadata()
: Commands.peekAndCopyMessageMetadata(entry.getDataBuffer(), subscription.toString(), -1);
if (metadata != null) {
remainingMessages += metadata.getNumMessagesInBatch();
if (!hasChunk && metadata.hasUuid()) {
Expand Down Expand Up @@ -1232,25 +1238,30 @@ protected synchronized NavigableSet<Position> getMessagesToReplayNow(int maxMess
delayedDeliveryTracker.get().getScheduledMessages(maxMessagesToRead);
messagesAvailableNow.forEach(p -> redeliveryMessages.add(p.getLedgerId(), p.getEntryId()));
}

if (!redeliveryMessages.isEmpty()) {
return redeliveryMessages.getMessagesToReplayNow(maxMessagesToRead);
return redeliveryMessages.getMessagesToReplayNow(maxMessagesToRead, createFilterForReplay());
} else {
return Collections.emptyNavigableSet();
}
}

protected Optional<Position> getFirstPositionInReplay() {
return redeliveryMessages.getFirstPositionInReplay();
}

/**
* This is a mode method designed for Key_Shared mode.
* Creates a stateful filter for filtering replay positions.
* This is only used for Key_Shared mode to skip replaying certain entries.
* Filter out the entries that will be discarded due to the order guarantee mechanism of Key_Shared mode.
* This method is in order to avoid the scenario below:
* - Get positions from the Replay queue.
* - Read entries from BK.
* - The order guarantee mechanism of Key_Shared mode filtered out all the entries.
* - Delivery non entry to the client, but we did a BK read.
*/
protected NavigableSet<Position> filterOutEntriesWillBeDiscarded(NavigableSet<Position> src) {
return src;
protected Predicate<Position> createFilterForReplay() {
// pick all positions from the replay
return position -> true;
}

/**
Expand Down
Loading
Loading