Skip to content

Commit

Permalink
[improve][broker] Add limits for Key_Shared Subscription look ahead i…
Browse files Browse the repository at this point in the history
…n dispatching (#23231)

Co-authored-by: Matteo Merli <[email protected]>
Co-authored-by: Yuri Mizushima <[email protected]>
  • Loading branch information
3 people authored Sep 18, 2024
1 parent bf53164 commit 77570d5
Show file tree
Hide file tree
Showing 16 changed files with 1,068 additions and 384 deletions.
19 changes: 19 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,25 @@ maxUnackedMessagesPerBroker=0
# limit/2 messages
maxUnackedMessagesPerSubscriptionOnBrokerBlocked=0.16

# For Key_Shared subscriptions, if messages cannot be dispatched to consumers due to a slow consumer
# or a blocked key hash (because of ordering constraints), the broker will continue reading more
# messages from the backlog and attempt to dispatch them to consumers until the number of replay
# messages reaches the calculated threshold.
# Formula: threshold = min(keySharedLookAheadMsgInReplayThresholdPerConsumer *
# connected consumer count, keySharedLookAheadMsgInReplayThresholdPerSubscription).
# Setting this value to 0 will disable the limit calculated per consumer.
keySharedLookAheadMsgInReplayThresholdPerConsumer=2000

# For Key_Shared subscriptions, if messages cannot be dispatched to consumers due to a slow consumer
# or a blocked key hash (because of ordering constraints), the broker will continue reading more
# messages from the backlog and attempt to dispatch them to consumers until the number of replay
# messages reaches the calculated threshold.
# Formula: threshold = min(keySharedLookAheadMsgInReplayThresholdPerConsumer *
# connected consumer count, keySharedLookAheadMsgInReplayThresholdPerSubscription).
# This value should be set to a value less than 2 * managedLedgerMaxUnackedRangesToPersist.
# Setting this value to 0 will disable the limit calculated per subscription.
keySharedLookAheadMsgInReplayThresholdPerSubscription=20000

# Broker periodically checks if subscription is stuck and unblock if flag is enabled. (Default is disabled)
unblockStuckSubscriptionEnabled=false

Expand Down
19 changes: 19 additions & 0 deletions conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,25 @@ maxUnackedMessagesPerBroker=0
# limit/2 messages
maxUnackedMessagesPerSubscriptionOnBrokerBlocked=0.16

# For Key_Shared subscriptions, if messages cannot be dispatched to consumers due to a slow consumer
# or a blocked key hash (because of ordering constraints), the broker will continue reading more
# messages from the backlog and attempt to dispatch them to consumers until the number of replay
# messages reaches the calculated threshold.
# Formula: threshold = min(keySharedLookAheadMsgInReplayThresholdPerConsumer *
# connected consumer count, keySharedLookAheadMsgInReplayThresholdPerSubscription).
# Setting this value to 0 will disable the limit calculated per consumer.
keySharedLookAheadMsgInReplayThresholdPerConsumer=2000

# For Key_Shared subscriptions, if messages cannot be dispatched to consumers due to a slow consumer
# or a blocked key hash (because of ordering constraints), the broker will continue reading more
# messages from the backlog and attempt to dispatch them to consumers until the number of replay
# messages reaches the calculated threshold.
# Formula: threshold = min(keySharedLookAheadMsgInReplayThresholdPerConsumer *
# connected consumer count, keySharedLookAheadMsgInReplayThresholdPerSubscription).
# This value should be set to a value less than 2 * managedLedgerMaxUnackedRangesToPersist.
# Setting this value to 0 will disable the limit calculated per subscription.
keySharedLookAheadMsgInReplayThresholdPerSubscription=20000

# Tick time to schedule task that checks topic publish rate limiting across all topics
# Reducing to lower value can give more accuracy while throttling publish but
# it uses more CPU to perform frequent check. (Disable publish throttling with value 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -949,6 +949,36 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece
+ " back and unack count reaches to `limit/2`. Using a value of 0, is disabling unackedMessage-limit"
+ " check and broker doesn't block dispatchers")
private int maxUnackedMessagesPerBroker = 0;

@FieldContext(
category = CATEGORY_POLICIES,
doc = "For Key_Shared subscriptions, if messages cannot be dispatched to consumers due to a slow consumer"
+ " or a blocked key hash (because of ordering constraints), the broker will continue reading more"
+ " messages from the backlog and attempt to dispatch them to consumers until the number of replay"
+ " messages reaches the calculated threshold.\n"
+ "Formula: threshold = min(keySharedLookAheadMsgInReplayThresholdPerConsumer *"
+ " connected consumer count, keySharedLookAheadMsgInReplayThresholdPerSubscription)"
+ ".\n"
+ "Setting this value to 0 will disable the limit calculated per consumer.",
dynamic = true
)
private int keySharedLookAheadMsgInReplayThresholdPerConsumer = 2000;

@FieldContext(
category = CATEGORY_POLICIES,
doc = "For Key_Shared subscriptions, if messages cannot be dispatched to consumers due to a slow consumer"
+ " or a blocked key hash (because of ordering constraints), the broker will continue reading more"
+ " messages from the backlog and attempt to dispatch them to consumers until the number of replay"
+ " messages reaches the calculated threshold.\n"
+ "Formula: threshold = min(keySharedLookAheadMsgInReplayThresholdPerConsumer *"
+ " connected consumer count, keySharedLookAheadMsgInReplayThresholdPerSubscription)"
+ ".\n"
+ "This value should be set to a value less than 2 * managedLedgerMaxUnackedRangesToPersist.\n"
+ "Setting this value to 0 will disable the limit calculated per subscription.\n",
dynamic = true
)
private int keySharedLookAheadMsgInReplayThresholdPerSubscription = 20000;

@FieldContext(
category = CATEGORY_POLICIES,
doc = "Once broker reaches maxUnackedMessagesPerBroker limit, it blocks subscriptions which has higher "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@
import java.util.ArrayList;
import java.util.List;
import java.util.NavigableSet;
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.function.Predicate;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.PositionFactory;
Expand Down Expand Up @@ -146,8 +149,32 @@ public boolean containsStickyKeyHashes(Set<Integer> stickyKeyHashes) {
return false;
}

public NavigableSet<Position> getMessagesToReplayNow(int maxMessagesToRead) {
return messagesToRedeliver.items(maxMessagesToRead, PositionFactory::create);
public boolean containsStickyKeyHash(int stickyKeyHash) {
return !allowOutOfOrderDelivery && hashesRefCount.containsKey(stickyKeyHash);
}

public Optional<Position> getFirstPositionInReplay() {
return messagesToRedeliver.first(PositionFactory::create);
}

/**
* Get the messages to replay now.
*
* @param maxMessagesToRead
* the max messages to read
* @param filter
* the filter to use to select the messages to replay
* @return the messages to replay now
*/
public NavigableSet<Position> getMessagesToReplayNow(int maxMessagesToRead, Predicate<Position> filter) {
NavigableSet<Position> items = new TreeSet<>();
messagesToRedeliver.processItems(PositionFactory::create, item -> {
if (filter.test(item)) {
items.add(item);
}
return items.size() < maxMessagesToRead;
});
return items;
}

/**
Expand Down
Loading

0 comments on commit 77570d5

Please sign in to comment.