Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][broker] Add limits for Key_Shared Subscription look ahead in dispatching #23231

Merged

Conversation

lhotari
Copy link
Member

@lhotari lhotari commented Aug 27, 2024

Fixes #23200

Motivation

This PR introduces enhancements to the Key_Shared subscription dispatching logic in the Pulsar broker. The changes aim to optimize message dispatching, improve the handling of slow consumers, and address several identified issues related to message replay and message "look ahead" when consumers are blocked in delivery due to key ordering restrictions.

This PR adds limits to the Key_Shared Subscription look-ahead feature, which was introduced in PR #7105. Additionally, there are several improvements to the previous logic.

Since there wasn't any named concept for the feature added in PR #7105, it has been named "key shared look ahead" in this PR. This references the feature added in PR #7105, where the dispatcher will skip replaying messages and read more messages from the backlog. The changes in PR #7105 didn't add a limit for reading more messages. The comments in the PR mention that it will be limited by the unacked message limits. However, this isn't the case since the read messages aren't considered unacked messages until they have been dispatched to consumers.

Reading ahead would be triggered in multiple cases:

  • When consumers are backpressured due to slow processing or message delivery
  • When consumers are blocked due to key ordering restrictions

Since consumers were blocked unnecessarily, reading ahead would often just continue reading the complete backlog into the replay queue. The impact is described in #23200

In addition to fix the infinite key shared look ahead issue, this PR seems to fix a problem in the existing implementation where this code blocks progress of a consumer:

if (readType == ReadType.Normal && stickyKeyHashes != null
&& redeliveryMessages.containsStickyKeyHashes(stickyKeyHashes)) {
// If redeliveryMessages contains messages that correspond to the same hash as the messages
// that the dispatcher is trying to send, do not send those messages for order guarantee
return 0;
}

What this means is that when the dispatcher has read a batch, it will discard all messages in the read messages if any hash is present in the replay queue (redelivery messages). This is an unoptimal solution for ensuring that ordering is preserved. This PR addresses the issue while attempting to keep backwards compatibility from a single consumer's perspective.

The "look ahead" limit introduced in this PR will add backpressure to how the dispatcher works. Eventually consumers will be slower than the dispatcher sending messages to consumers and that's why it's necessary to have the limit, also in cases where the hashes wouldn't be blocked or "allowOutOfOrderDelivery" mode is used.

There's also another related problem that this change addresses. The head of the replay queue might get blocked in the release Pulsar versions.

NavigableSet<Position> messagesToReplayNow = getMessagesToReplayNow(messagesToRead);
NavigableSet<Position> messagesToReplayFiltered = filterOutEntriesWillBeDiscarded(messagesToReplayNow);
if (!messagesToReplayFiltered.isEmpty()) {

In this PR, the solution to this is to take as many entries that match the criteria from the replay queue. This required changing the getMessagesToReplayNow to accept a Predicate for filtering:

    public NavigableSet<Position> getMessagesToReplayNow(int maxMessagesToRead, Predicate<Position> filter) {
        NavigableSet<Position> items = new TreeSet<>();
        messagesToRedeliver.processItems(PositionFactory::create, item -> {
            if (filter.test(item)) {
                items.add(item);
            }
            return items.size() < maxMessagesToRead;
        });
        return items;
    }

Modifications

  1. Configuration Properties:

    • Added new configuration properties to control the limits of Key_Shared subscriptions look ahead in dispatching:
      • keySharedLookAheadMsgInReplayThresholdPerConsumer
      • keySharedLookAheadMsgInReplayThresholdPerSubscription
      • keySharedLookAheadEnabledWhenRecentlyJoinedConsumersExist
  2. Broker Configuration:

    • Updated broker.conf to include descriptions and default values for the new configuration properties.
  3. Key_Shared Subscription Logic:

    • Implement the key shared look-ahead limits that are configured with the above configuration properties.
    • Enhance the dispatching to ensure efficient dispatching:
      • Instead of filtering the list of entries pulled from the message redelivery controller, refactor the solution to take a function that can filter the entries until there's a sufficient amount of messages for dispatching.
        • This refactoring eliminates the previous method filterOutEntriesWillBeDiscarded while preserving the previous logic of filtering.
      • Remove the unnecessary thread-local fields localGroupedEntries and the unused localGroupedPositions.
        • It is unnecessary to eliminate the creation of short-lived objects in modern JVMs. It just clutters the code without any performance benefit. "No garbage" style is relevant mainly for long-lived objects stored in caches or large array allocations.
    • Replace getMessagesToReplayNow(1) with getFirstPositionInReplay().
      • This check will no longer have the side effect of pulling messages from delayed delivery when it's called.
    • Add a new method canReplayMessages to make the look-ahead logic more explicit. The previous solution of returning an empty set from the getMessagesToReplayNow method made it hard to understand how the look-ahead behavior is implemented.
    • Rename the isDispatcherStuckOnReplays field to skipNextReplayToTriggerLookAhead to make the meaning of the field explicit and self-descriptive.
    • Fix issue where the !havePendingReplayRead status wasn't checked before starting a new read.
      • For shared subscription, it's fine to just check !havePendingPendingRead, but for Key_Shared it isn't.
      • extract doesntHavePendingRead for handling the logic
    • Rename the keyNumbers variable to remainingConsumersToFinishSending so that the meaning of the variable is explicit and self-descriptive.
    • Rename the groupedEntries variable to entriesByConsumerForDispatching so that the meaning of the variable is explicit and self-descriptive.
    • Rename the entriesWithSameKey variable to entriesForConsumer so that the variable is self-descriptive.
    • Fix calculations based on maxUnackedMessages and unackedMessages. The unit is in individual messages and permits are in entries. These 2 cannot be mixed.
    • Addressing the consumer blocked issue: the logic has been changed to evaluate each message hash one-by-one.

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

@github-actions github-actions bot added the doc-required Your PR changes impact docs and you will update later. label Aug 27, 2024
@lhotari lhotari added ready-to-test and removed doc-required Your PR changes impact docs and you will update later. labels Aug 27, 2024
@github-actions github-actions bot added the doc-required Your PR changes impact docs and you will update later. label Aug 27, 2024
@lhotari lhotari marked this pull request as ready for review August 27, 2024 12:45
@lhotari lhotari marked this pull request as draft August 27, 2024 12:51
@lhotari
Copy link
Member Author

lhotari commented Aug 27, 2024

This PR is ready for initial review feedback. I'm looking into ways of adding tests for the key_shared look ahead limits.

@lhotari lhotari marked this pull request as ready for review August 29, 2024 19:31
@lhotari lhotari force-pushed the lh-limit-keyshared-dispatching-read-ahead branch from 62a9fbf to a263ad4 Compare August 29, 2024 19:31
@lhotari lhotari marked this pull request as draft September 6, 2024 12:52
@lhotari
Copy link
Member Author

lhotari commented Sep 6, 2024

Noticing more challenges to cover while getting into the tests. Setting this to draft mode until the concerns are addressed and tests have been added. Some of the concerns are related to #23264.

conf/broker.conf Outdated Show resolved Hide resolved
@lhotari lhotari marked this pull request as ready for review September 18, 2024 11:15
@lhotari lhotari added this to the 4.0.0 milestone Sep 18, 2024
@lhotari lhotari merged commit 77570d5 into apache:master Sep 18, 2024
53 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
doc-required Your PR changes impact docs and you will update later. ready-to-test
10 participants