diff --git a/conf/broker.conf b/conf/broker.conf index ed59e5c456695..74130d709cdd2 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -355,6 +355,25 @@ maxUnackedMessagesPerBroker=0 # limit/2 messages maxUnackedMessagesPerSubscriptionOnBrokerBlocked=0.16 +# For Key_Shared subscriptions, if messages cannot be dispatched to consumers due to a slow consumer +# or a blocked key hash (because of ordering constraints), the broker will continue reading more +# messages from the backlog and attempt to dispatch them to consumers until the number of replay +# messages reaches the calculated threshold. +# Formula: threshold = min(keySharedLookAheadMsgInReplayThresholdPerConsumer * +# connected consumer count, keySharedLookAheadMsgInReplayThresholdPerSubscription). +# Setting this value to 0 will disable the limit calculated per consumer. +keySharedLookAheadMsgInReplayThresholdPerConsumer=2000 + +# For Key_Shared subscriptions, if messages cannot be dispatched to consumers due to a slow consumer +# or a blocked key hash (because of ordering constraints), the broker will continue reading more +# messages from the backlog and attempt to dispatch them to consumers until the number of replay +# messages reaches the calculated threshold. +# Formula: threshold = min(keySharedLookAheadMsgInReplayThresholdPerConsumer * +# connected consumer count, keySharedLookAheadMsgInReplayThresholdPerSubscription). +# This value should be set to a value less than 2 * managedLedgerMaxUnackedRangesToPersist. +# Setting this value to 0 will disable the limit calculated per subscription. +keySharedLookAheadMsgInReplayThresholdPerSubscription=20000 + # Broker periodically checks if subscription is stuck and unblock if flag is enabled. (Default is disabled) unblockStuckSubscriptionEnabled=false diff --git a/conf/standalone.conf b/conf/standalone.conf index d5d79e0383e1f..622949bf6c325 100644 --- a/conf/standalone.conf +++ b/conf/standalone.conf @@ -232,6 +232,25 @@ maxUnackedMessagesPerBroker=0 # limit/2 messages maxUnackedMessagesPerSubscriptionOnBrokerBlocked=0.16 +# For Key_Shared subscriptions, if messages cannot be dispatched to consumers due to a slow consumer +# or a blocked key hash (because of ordering constraints), the broker will continue reading more +# messages from the backlog and attempt to dispatch them to consumers until the number of replay +# messages reaches the calculated threshold. +# Formula: threshold = min(keySharedLookAheadMsgInReplayThresholdPerConsumer * +# connected consumer count, keySharedLookAheadMsgInReplayThresholdPerSubscription). +# Setting this value to 0 will disable the limit calculated per consumer. +keySharedLookAheadMsgInReplayThresholdPerConsumer=2000 + +# For Key_Shared subscriptions, if messages cannot be dispatched to consumers due to a slow consumer +# or a blocked key hash (because of ordering constraints), the broker will continue reading more +# messages from the backlog and attempt to dispatch them to consumers until the number of replay +# messages reaches the calculated threshold. +# Formula: threshold = min(keySharedLookAheadMsgInReplayThresholdPerConsumer * +# connected consumer count, keySharedLookAheadMsgInReplayThresholdPerSubscription). +# This value should be set to a value less than 2 * managedLedgerMaxUnackedRangesToPersist. +# Setting this value to 0 will disable the limit calculated per subscription. +keySharedLookAheadMsgInReplayThresholdPerSubscription=20000 + # Tick time to schedule task that checks topic publish rate limiting across all topics # Reducing to lower value can give more accuracy while throttling publish but # it uses more CPU to perform frequent check. (Disable publish throttling with value 0) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 60f37f52b6b8c..42dc959426692 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -949,6 +949,36 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece + " back and unack count reaches to `limit/2`. Using a value of 0, is disabling unackedMessage-limit" + " check and broker doesn't block dispatchers") private int maxUnackedMessagesPerBroker = 0; + + @FieldContext( + category = CATEGORY_POLICIES, + doc = "For Key_Shared subscriptions, if messages cannot be dispatched to consumers due to a slow consumer" + + " or a blocked key hash (because of ordering constraints), the broker will continue reading more" + + " messages from the backlog and attempt to dispatch them to consumers until the number of replay" + + " messages reaches the calculated threshold.\n" + + "Formula: threshold = min(keySharedLookAheadMsgInReplayThresholdPerConsumer *" + + " connected consumer count, keySharedLookAheadMsgInReplayThresholdPerSubscription)" + + ".\n" + + "Setting this value to 0 will disable the limit calculated per consumer.", + dynamic = true + ) + private int keySharedLookAheadMsgInReplayThresholdPerConsumer = 2000; + + @FieldContext( + category = CATEGORY_POLICIES, + doc = "For Key_Shared subscriptions, if messages cannot be dispatched to consumers due to a slow consumer" + + " or a blocked key hash (because of ordering constraints), the broker will continue reading more" + + " messages from the backlog and attempt to dispatch them to consumers until the number of replay" + + " messages reaches the calculated threshold.\n" + + "Formula: threshold = min(keySharedLookAheadMsgInReplayThresholdPerConsumer *" + + " connected consumer count, keySharedLookAheadMsgInReplayThresholdPerSubscription)" + + ".\n" + + "This value should be set to a value less than 2 * managedLedgerMaxUnackedRangesToPersist.\n" + + "Setting this value to 0 will disable the limit calculated per subscription.\n", + dynamic = true + ) + private int keySharedLookAheadMsgInReplayThresholdPerSubscription = 20000; + @FieldContext( category = CATEGORY_POLICIES, doc = "Once broker reaches maxUnackedMessagesPerBroker limit, it blocks subscriptions which has higher " diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java index 9d29b93ca450d..fa6e1412151b6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java @@ -22,7 +22,10 @@ import java.util.ArrayList; import java.util.List; import java.util.NavigableSet; +import java.util.Optional; import java.util.Set; +import java.util.TreeSet; +import java.util.function.Predicate; import javax.annotation.concurrent.NotThreadSafe; import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.PositionFactory; @@ -146,8 +149,32 @@ public boolean containsStickyKeyHashes(Set stickyKeyHashes) { return false; } - public NavigableSet getMessagesToReplayNow(int maxMessagesToRead) { - return messagesToRedeliver.items(maxMessagesToRead, PositionFactory::create); + public boolean containsStickyKeyHash(int stickyKeyHash) { + return !allowOutOfOrderDelivery && hashesRefCount.containsKey(stickyKeyHash); + } + + public Optional getFirstPositionInReplay() { + return messagesToRedeliver.first(PositionFactory::create); + } + + /** + * Get the messages to replay now. + * + * @param maxMessagesToRead + * the max messages to read + * @param filter + * the filter to use to select the messages to replay + * @return the messages to replay now + */ + public NavigableSet getMessagesToReplayNow(int maxMessagesToRead, Predicate filter) { + NavigableSet items = new TreeSet<>(); + messagesToRedeliver.processItems(PositionFactory::create, item -> { + if (filter.test(item)) { + items.add(item); + } + return items.size() < maxMessagesToRead; + }); + return items; } /** diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index 264bac7cb6aab..450a446c85a78 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -135,6 +135,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul protected final ExecutorService dispatchMessagesThread; private final SharedConsumerAssignor assignor; protected int lastNumberOfEntriesDispatched; + protected boolean skipNextBackoff; private final Backoff retryBackoff; protected enum ReadType { Normal, Replay @@ -345,26 +346,24 @@ public synchronized void readMoreEntries() { return; } - NavigableSet messagesToReplayNow = getMessagesToReplayNow(messagesToRead); - NavigableSet messagesToReplayFiltered = filterOutEntriesWillBeDiscarded(messagesToReplayNow); - if (!messagesToReplayFiltered.isEmpty()) { + Set messagesToReplayNow = + canReplayMessages() ? getMessagesToReplayNow(messagesToRead) : Collections.emptySet(); + if (!messagesToReplayNow.isEmpty()) { if (log.isDebugEnabled()) { log.debug("[{}] Schedule replay of {} messages for {} consumers", name, - messagesToReplayFiltered.size(), consumerList.size()); + messagesToReplayNow.size(), consumerList.size()); } - havePendingReplayRead = true; - minReplayedPosition = messagesToReplayNow.first(); + updateMinReplayedPosition(); Set deletedMessages = topic.isDelayedDeliveryEnabled() - ? asyncReplayEntriesInOrder(messagesToReplayFiltered) - : asyncReplayEntries(messagesToReplayFiltered); + ? asyncReplayEntriesInOrder(messagesToReplayNow) + : asyncReplayEntries(messagesToReplayNow); // clear already acked positions from replay bucket - deletedMessages.forEach(position -> redeliveryMessages.remove(position.getLedgerId(), position.getEntryId())); // if all the entries are acked-entries and cleared up from redeliveryMessages, try to read // next entries as readCompletedEntries-callback was never called - if ((messagesToReplayFiltered.size() - deletedMessages.size()) == 0) { + if ((messagesToReplayNow.size() - deletedMessages.size()) == 0) { havePendingReplayRead = false; readMoreEntriesAsync(); } @@ -373,7 +372,11 @@ public synchronized void readMoreEntries() { log.debug("[{}] Dispatcher read is blocked due to unackMessages {} reached to max {}", name, totalUnackedMessages, topic.getMaxUnackedMessagesOnSubscription()); } - } else if (!havePendingRead && hasConsumersNeededNormalRead()) { + } else if (doesntHavePendingRead()) { + if (!isNormalReadAllowed()) { + handleNormalReadNotAllowed(); + return; + } if (shouldPauseOnAckStatePersist(ReadType.Normal)) { if (log.isDebugEnabled()) { log.debug("[{}] [{}] Skipping read for the topic, Due to blocked on ack state persistent.", @@ -386,13 +389,9 @@ public synchronized void readMoreEntries() { consumerList.size()); } havePendingRead = true; - NavigableSet toReplay = getMessagesToReplayNow(1); - if (!toReplay.isEmpty()) { - minReplayedPosition = toReplay.first(); - redeliveryMessages.add(minReplayedPosition.getLedgerId(), minReplayedPosition.getEntryId()); - } else { - minReplayedPosition = null; - } + updateMinReplayedPosition(); + + messagesToRead = Math.min(messagesToRead, getMaxEntriesReadLimit()); // Filter out and skip read delayed messages exist in DelayedDeliveryTracker if (delayedDeliveryTracker.isPresent()) { @@ -410,14 +409,7 @@ public synchronized void readMoreEntries() { } } else { if (log.isDebugEnabled()) { - if (!messagesToReplayNow.isEmpty()) { - log.debug("[{}] [{}] Skipping read for the topic: because all entries in replay queue were" - + " filtered out due to the mechanism of Key_Shared mode, and the left consumers have" - + " no permits now", - topic.getName(), getSubscriptionName()); - } else { - log.debug("[{}] Cannot schedule next read until previous one is done", name); - } + log.debug("[{}] Cannot schedule next read until previous one is done", name); } } } else { @@ -427,6 +419,43 @@ public synchronized void readMoreEntries() { } } + /** + * Sets a hard limit on the number of entries to read from the Managed Ledger. + * Subclasses can override this method to set a different limit. + * By default, this method does not impose an additional limit. + * + * @return the maximum number of entries to read from the Managed Ledger + */ + protected int getMaxEntriesReadLimit() { + return Integer.MAX_VALUE; + } + + /** + * Checks if there's a pending read operation that hasn't completed yet. + * This allows to avoid scheduling a new read operation while the previous one is still in progress. + * @return true if there's a pending read operation + */ + protected boolean doesntHavePendingRead() { + return !havePendingRead; + } + + protected void handleNormalReadNotAllowed() { + // do nothing + } + + /** + * Controls whether replaying entries is currently enabled. + * Subclasses can override this method to temporarily disable replaying entries. + * @return true if replaying entries is currently enabled + */ + protected boolean canReplayMessages() { + return true; + } + + private void updateMinReplayedPosition() { + minReplayedPosition = getFirstPositionInReplay().orElse(null); + } + private boolean shouldPauseOnAckStatePersist(ReadType readType) { // Allows new consumers to consume redelivered messages caused by the just-closed consumer. if (readType != ReadType.Normal) { @@ -446,6 +475,10 @@ protected void reScheduleRead() { reScheduleReadInMs(MESSAGE_RATE_BACKOFF_MS); } + protected synchronized void reScheduleReadWithBackoff() { + reScheduleReadInMs(retryBackoff.next()); + } + protected void reScheduleReadInMs(long readAfterMs) { if (isRescheduleReadInProgress.compareAndSet(false, true)) { if (log.isDebugEnabled()) { @@ -697,14 +730,15 @@ private synchronized void handleSendingMessagesAndReadingMore(ReadType readType, int entriesDispatched = lastNumberOfEntriesDispatched; updatePendingBytesToDispatch(-totalBytesSize); if (triggerReadingMore) { - if (entriesDispatched > 0) { + if (entriesDispatched > 0 || skipNextBackoff) { + skipNextBackoff = false; // Reset the backoff when we successfully dispatched messages retryBackoff.reset(); // Call readMoreEntries in the same thread to trigger the next read readMoreEntries(); } else if (entriesDispatched == 0) { // If no messages were dispatched, we need to reschedule a new read with an increasing backoff delay - reScheduleReadInMs(retryBackoff.next()); + reScheduleReadWithBackoff(); } } } @@ -754,8 +788,9 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis int remainingMessages = 0; boolean hasChunk = false; for (int i = 0; i < metadataArray.length; i++) { - final MessageMetadata metadata = Commands.peekAndCopyMessageMetadata( - entries.get(i).getDataBuffer(), subscription.toString(), -1); + Entry entry = entries.get(i); + MessageMetadata metadata = entry instanceof EntryAndMetadata ? ((EntryAndMetadata) entry).getMetadata() + : Commands.peekAndCopyMessageMetadata(entry.getDataBuffer(), subscription.toString(), -1); if (metadata != null) { remainingMessages += metadata.getNumMessagesInBatch(); if (!hasChunk && metadata.hasUuid()) { @@ -788,22 +823,27 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis // round-robin dispatch batch size for this consumer int availablePermits = c.isWritable() ? c.getAvailablePermits() : 1; - if (c.getMaxUnackedMessages() > 0) { - // Avoid negative number - int remainUnAckedMessages = Math.max(c.getMaxUnackedMessages() - c.getUnackedMessages(), 0); - availablePermits = Math.min(availablePermits, remainUnAckedMessages); - } if (log.isDebugEnabled() && !c.isWritable()) { log.debug("[{}-{}] consumer is not writable. dispatching only 1 message to {}; " + "availablePermits are {}", topic.getName(), name, c, c.getAvailablePermits()); } - int messagesForC = Math.min(Math.min(remainingMessages, availablePermits), - serviceConfig.getDispatcherMaxRoundRobinBatchSize()); - messagesForC = Math.max(messagesForC / avgBatchSizePerMsg, 1); - - int end = Math.min(start + messagesForC, entries.size()); + int maxMessagesInThisBatch = + Math.max(remainingMessages, serviceConfig.getDispatcherMaxRoundRobinBatchSize()); + if (c.getMaxUnackedMessages() > 0) { + // Calculate the maximum number of additional unacked messages allowed + int maxAdditionalUnackedMessages = Math.max(c.getMaxUnackedMessages() - c.getUnackedMessages(), 0); + maxMessagesInThisBatch = Math.min(maxMessagesInThisBatch, maxAdditionalUnackedMessages); + } + int maxEntriesInThisBatch = Math.min(availablePermits, + // use the average batch size per message to calculate the number of entries to + // dispatch. round up to the next integer without using floating point arithmetic. + (maxMessagesInThisBatch + avgBatchSizePerMsg - 1) / avgBatchSizePerMsg); + // pick at least one entry to dispatch + maxEntriesInThisBatch = Math.max(maxEntriesInThisBatch, 1); + + int end = Math.min(start + maxEntriesInThisBatch, entries.size()); List entriesForThisConsumer = entries.subList(start, end); // remove positions first from replay list first : sendMessages recycles entries @@ -817,6 +857,7 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis EntryBatchSizes batchSizes = EntryBatchSizes.get(entriesForThisConsumer.size()); EntryBatchIndexesAcks batchIndexesAcks = EntryBatchIndexesAcks.get(entriesForThisConsumer.size()); + totalEntries += filterEntriesForConsumer(metadataArray, start, entriesForThisConsumer, batchSizes, sendMessageInfo, batchIndexesAcks, cursor, readType == ReadType.Replay, c); @@ -826,8 +867,8 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis int msgSent = sendMessageInfo.getTotalMessages(); remainingMessages -= msgSent; - start += messagesForC; - entriesToDispatch -= messagesForC; + start += maxEntriesInThisBatch; + entriesToDispatch -= maxEntriesInThisBatch; TOTAL_AVAILABLE_PERMITS_UPDATER.addAndGet(this, -(msgSent - batchIndexesAcks.getTotalAckedIndexCount())); if (log.isDebugEnabled()) { @@ -848,8 +889,7 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis entries.size() - start); } entries.subList(start, entries.size()).forEach(entry -> { - long stickyKeyHash = getStickyKeyHash(entry); - addMessageToReplay(entry.getLedgerId(), entry.getEntryId(), stickyKeyHash); + addEntryToReplay(entry); entry.release(); }); } @@ -857,6 +897,11 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis return true; } + protected void addEntryToReplay(Entry entry) { + long stickyKeyHash = getStickyKeyHash(entry); + addMessageToReplay(entry.getLedgerId(), entry.getEntryId(), stickyKeyHash); + } + private boolean sendChunkedMessagesToConsumers(ReadType readType, List entries, MessageMetadata[] metadataArray) { @@ -1021,7 +1066,7 @@ protected int getFirstAvailableConsumerPermits() { return 0; } for (Consumer consumer : consumerList) { - if (consumer != null && !consumer.isBlocked()) { + if (consumer != null && !consumer.isBlocked() && consumer.cnx().isActive()) { int availablePermits = consumer.getAvailablePermits(); if (availablePermits > 0) { return availablePermits; @@ -1045,7 +1090,8 @@ private boolean isConsumerWritable() { @Override public boolean isConsumerAvailable(Consumer consumer) { - return consumer != null && !consumer.isBlocked() && consumer.getAvailablePermits() > 0; + return consumer != null && !consumer.isBlocked() && consumer.cnx().isActive() + && consumer.getAvailablePermits() > 0; } @Override @@ -1235,16 +1281,20 @@ protected synchronized NavigableSet getMessagesToReplayNow(int maxMess delayedDeliveryTracker.get().getScheduledMessages(maxMessagesToRead); messagesAvailableNow.forEach(p -> redeliveryMessages.add(p.getLedgerId(), p.getEntryId())); } - if (!redeliveryMessages.isEmpty()) { - return redeliveryMessages.getMessagesToReplayNow(maxMessagesToRead); + return redeliveryMessages.getMessagesToReplayNow(maxMessagesToRead, createFilterForReplay()); } else { return Collections.emptyNavigableSet(); } } + protected Optional getFirstPositionInReplay() { + return redeliveryMessages.getFirstPositionInReplay(); + } + /** - * This is a mode method designed for Key_Shared mode. + * Creates a stateful filter for filtering replay positions. + * This is only used for Key_Shared mode to skip replaying certain entries. * Filter out the entries that will be discarded due to the order guarantee mechanism of Key_Shared mode. * This method is in order to avoid the scenario below: * - Get positions from the Replay queue. @@ -1252,18 +1302,20 @@ protected synchronized NavigableSet getMessagesToReplayNow(int maxMess * - The order guarantee mechanism of Key_Shared mode filtered out all the entries. * - Delivery non entry to the client, but we did a BK read. */ - protected NavigableSet filterOutEntriesWillBeDiscarded(NavigableSet src) { - return src; + protected Predicate createFilterForReplay() { + // pick all positions from the replay + return position -> true; } /** - * This is a mode method designed for Key_Shared mode, to avoid unnecessary stuck. - * See detail {@link PersistentStickyKeyDispatcherMultipleConsumers#hasConsumersNeededNormalRead}. + * Checks if the dispatcher is allowed to read messages from the cursor. */ - protected boolean hasConsumersNeededNormalRead() { + protected boolean isNormalReadAllowed() { return true; } + + protected synchronized boolean shouldPauseDeliveryForDelayTracker() { return delayedDeliveryTracker.isPresent() && delayedDeliveryTracker.get().shouldPauseAllDeliveries(); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java index e8e4919a9be52..d45b9394dd744 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java @@ -19,21 +19,18 @@ package org.apache.pulsar.broker.service.persistent; import com.google.common.annotations.VisibleForTesting; -import io.netty.util.concurrent.FastThreadLocal; import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; -import java.util.NavigableSet; +import java.util.Optional; import java.util.Set; -import java.util.TreeSet; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicInteger; -import java.util.stream.Collectors; +import java.util.function.Predicate; import javax.annotation.Nullable; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.ManagedCursor; @@ -42,6 +39,8 @@ import org.apache.bookkeeper.mledger.PositionFactory; import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; import org.apache.commons.collections4.MapUtils; +import org.apache.commons.lang3.mutable.MutableBoolean; +import org.apache.commons.lang3.mutable.MutableInt; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.service.BrokerServiceException; import org.apache.pulsar.broker.service.ConsistentHashingStickyKeyConsumerSelector; @@ -68,7 +67,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi private final boolean allowOutOfOrderDelivery; private final StickyKeyConsumerSelector selector; - private boolean isDispatcherStuckOnReplays = false; + private boolean skipNextReplayToTriggerLookAhead = false; private final KeySharedMode keySharedMode; /** @@ -183,22 +182,6 @@ public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceE } } - private static final FastThreadLocal>> localGroupedEntries = - new FastThreadLocal>>() { - @Override - protected Map> initialValue() throws Exception { - return new HashMap<>(); - } - }; - - private static final FastThreadLocal>> localGroupedPositions = - new FastThreadLocal>>() { - @Override - protected Map> initialValue() throws Exception { - return new HashMap<>(); - } - }; - @Override protected synchronized boolean trySendMessagesToConsumers(ReadType readType, List entries) { lastNumberOfEntriesDispatched = 0; @@ -221,15 +204,9 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis if (!allowOutOfOrderDelivery) { // A corner case that we have to retry a readMoreEntries in order to preserver order delivery. // This may happen when consumer closed. See issue #12885 for details. - NavigableSet messagesToReplayNow = this.getMessagesToReplayNow(1); - if (messagesToReplayNow != null && !messagesToReplayNow.isEmpty()) { - Position replayPosition = messagesToReplayNow.first(); - - // We have received a message potentially from the delayed tracker and, since we're not using it - // right now, it needs to be added to the redelivery tracker or we won't attempt anymore to - // resend it (until we disconnect consumer). - redeliveryMessages.add(replayPosition.getLedgerId(), replayPosition.getEntryId()); - + Optional firstReplayPosition = getFirstPositionInReplay(); + if (firstReplayPosition.isPresent()) { + Position replayPosition = firstReplayPosition.get(); if (this.minReplayedPosition != null) { // If relayPosition is a new entry wither smaller position is inserted for redelivery during this // async read, it is possible that this relayPosition should dispatch to consumer first. So in @@ -274,96 +251,58 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis } } - final Map> groupedEntries = localGroupedEntries.get(); - groupedEntries.clear(); - final Map> consumerStickyKeyHashesMap = new HashMap<>(); - - for (Entry entry : entries) { - int stickyKeyHash = getStickyKeyHash(entry); - Consumer c = selector.select(stickyKeyHash); - if (c != null) { - groupedEntries.computeIfAbsent(c, k -> new ArrayList<>()).add(entry); - consumerStickyKeyHashesMap.computeIfAbsent(c, k -> new HashSet<>()).add(stickyKeyHash); - } else { - addMessageToReplay(entry.getLedgerId(), entry.getEntryId(), stickyKeyHash); - entry.release(); - } - } - - AtomicInteger keyNumbers = new AtomicInteger(groupedEntries.size()); + // returns a boolean indicating whether look-ahead could be useful, when there's a consumer + // with available permits, and it's not able to make progress because of blocked hashes. + MutableBoolean triggerLookAhead = new MutableBoolean(); + // filter and group the entries by consumer for dispatching + final Map> entriesByConsumerForDispatching = + filterAndGroupEntriesForDispatching(entries, readType, triggerLookAhead); - int currentThreadKeyNumber = groupedEntries.size(); - if (currentThreadKeyNumber == 0) { - currentThreadKeyNumber = -1; - } - for (Map.Entry> current : groupedEntries.entrySet()) { + AtomicInteger remainingConsumersToFinishSending = new AtomicInteger(entriesByConsumerForDispatching.size()); + for (Map.Entry> current : entriesByConsumerForDispatching.entrySet()) { Consumer consumer = current.getKey(); - assert consumer != null; // checked when added to groupedEntries - List entriesWithSameKey = current.getValue(); - int entriesWithSameKeyCount = entriesWithSameKey.size(); - int availablePermits = getAvailablePermits(consumer); - int messagesForC = getRestrictedMaxEntriesForConsumer(consumer, - entriesWithSameKey.stream().map(Entry::getPosition).collect(Collectors.toList()), availablePermits, - readType, consumerStickyKeyHashesMap.get(consumer)); + List entriesForConsumer = current.getValue(); if (log.isDebugEnabled()) { log.debug("[{}] select consumer {} with messages num {}, read type is {}", - name, consumer.consumerName(), messagesForC, readType); + name, consumer.consumerName(), entriesForConsumer.size(), readType); } - - if (messagesForC < entriesWithSameKeyCount) { - // We are not able to push all the messages with given key to its consumer, - // so we discard for now and mark them for later redelivery - for (int i = messagesForC; i < entriesWithSameKeyCount; i++) { - Entry entry = entriesWithSameKey.get(i); - long stickyKeyHash = getStickyKeyHash(entry); - addMessageToReplay(entry.getLedgerId(), entry.getEntryId(), stickyKeyHash); - entry.release(); - entriesWithSameKey.set(i, null); + final ManagedLedger managedLedger = cursor.getManagedLedger(); + for (Entry entry : entriesForConsumer) { + // remove positions first from replay list first : sendMessages recycles entries + if (readType == ReadType.Replay) { + redeliveryMessages.remove(entry.getLedgerId(), entry.getEntryId()); } - } - - if (messagesForC > 0) { - final ManagedLedger managedLedger = cursor.getManagedLedger(); - for (int i = 0; i < messagesForC; i++) { - final Entry entry = entriesWithSameKey.get(i); - // remove positions first from replay list first : sendMessages recycles entries - if (readType == ReadType.Replay) { - redeliveryMessages.remove(entry.getLedgerId(), entry.getEntryId()); - } - // Add positions to individuallySentPositions if necessary - if (!allowOutOfOrderDelivery) { - final Position position = entry.getPosition(); - // Store to individuallySentPositions even if lastSentPosition is null - if ((lastSentPosition == null || position.compareTo(lastSentPosition) > 0) - && !individuallySentPositions.contains(position.getLedgerId(), position.getEntryId())) { - final Position previousPosition = managedLedger.getPreviousPosition(position); - individuallySentPositions.addOpenClosed(previousPosition.getLedgerId(), - previousPosition.getEntryId(), position.getLedgerId(), position.getEntryId()); - } + // Add positions to individuallySentPositions if necessary + if (!allowOutOfOrderDelivery) { + final Position position = entry.getPosition(); + // Store to individuallySentPositions even if lastSentPosition is null + if ((lastSentPosition == null || position.compareTo(lastSentPosition) > 0) + && !individuallySentPositions.contains(position.getLedgerId(), position.getEntryId())) { + final Position previousPosition = managedLedger.getPreviousPosition(position); + individuallySentPositions.addOpenClosed(previousPosition.getLedgerId(), + previousPosition.getEntryId(), position.getLedgerId(), position.getEntryId()); } } + } - SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal(); - EntryBatchSizes batchSizes = EntryBatchSizes.get(messagesForC); - EntryBatchIndexesAcks batchIndexesAcks = EntryBatchIndexesAcks.get(messagesForC); - totalEntries += filterEntriesForConsumer(entriesWithSameKey, batchSizes, sendMessageInfo, - batchIndexesAcks, cursor, readType == ReadType.Replay, consumer); - consumer.sendMessages(entriesWithSameKey, batchSizes, batchIndexesAcks, - sendMessageInfo.getTotalMessages(), - sendMessageInfo.getTotalBytes(), sendMessageInfo.getTotalChunkedMessages(), - getRedeliveryTracker()).addListener(future -> { - if (future.isDone() && keyNumbers.decrementAndGet() == 0) { - readMoreEntries(); - } - }); + SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal(); + EntryBatchSizes batchSizes = EntryBatchSizes.get(entriesForConsumer.size()); + EntryBatchIndexesAcks batchIndexesAcks = EntryBatchIndexesAcks.get(entriesForConsumer.size()); + totalEntries += filterEntriesForConsumer(entriesForConsumer, batchSizes, sendMessageInfo, + batchIndexesAcks, cursor, readType == ReadType.Replay, consumer); + consumer.sendMessages(entriesForConsumer, batchSizes, batchIndexesAcks, + sendMessageInfo.getTotalMessages(), + sendMessageInfo.getTotalBytes(), sendMessageInfo.getTotalChunkedMessages(), + getRedeliveryTracker()).addListener(future -> { + if (future.isDone() && remainingConsumersToFinishSending.decrementAndGet() == 0) { + readMoreEntries(); + } + }); - TOTAL_AVAILABLE_PERMITS_UPDATER.getAndAdd(this, - -(sendMessageInfo.getTotalMessages() - batchIndexesAcks.getTotalAckedIndexCount())); - totalMessagesSent += sendMessageInfo.getTotalMessages(); - totalBytesSent += sendMessageInfo.getTotalBytes(); - } else { - currentThreadKeyNumber = keyNumbers.decrementAndGet(); - } + TOTAL_AVAILABLE_PERMITS_UPDATER.getAndAdd(this, + -(sendMessageInfo.getTotalMessages() - batchIndexesAcks.getTotalAckedIndexCount())); + totalMessagesSent += sendMessageInfo.getTotalMessages(); + totalBytesSent += sendMessageInfo.getTotalBytes(); } // Update the last sent position and remove ranges from individuallySentPositions if necessary @@ -426,41 +365,250 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis // acquire message-dispatch permits for already delivered messages acquirePermitsForDeliveredMessages(topic, cursor, totalEntries, totalMessagesSent, totalBytesSent); - if (totalMessagesSent == 0 && (recentlyJoinedConsumers == null || recentlyJoinedConsumers.isEmpty())) { - // This means, that all the messages we've just read cannot be dispatched right now. - // This condition can only happen when: - // 1. We have consumers ready to accept messages (otherwise the would not haven been triggered) - // 2. All keys in the current set of messages are routing to consumers that are currently busy - // - // The solution here is to move on and read next batch of messages which might hopefully contain - // also keys meant for other consumers. - // - // We do it unless that are "recently joined consumers". In that case, we would be looking - // ahead in the stream while the new consumers are not ready to accept the new messages, - // therefore would be most likely only increase the distance between read-position and mark-delete - // position. - isDispatcherStuckOnReplays = true; + // trigger read more messages if necessary + if (triggerLookAhead.booleanValue()) { + // When all messages get filtered and no messages are sent, we should read more entries, "look ahead" + // so that a possible next batch of messages might contain messages that can be dispatched. + // This is done only when there's a consumer with available permits, and it's not able to make progress + // because of blocked hashes. Without this rule we would be looking ahead in the stream while the + // new consumers are not ready to accept the new messages, + // therefore would be most likely only increase the distance between read-position and mark-delete position. + skipNextReplayToTriggerLookAhead = true; + // skip backoff delay before reading ahead in the "look ahead" mode to prevent any additional latency + skipNextBackoff = true; return true; - } else if (currentThreadKeyNumber == 0) { + } + + // if no messages were sent, we should retry after a backoff delay + if (entriesByConsumerForDispatching.size() == 0) { return true; } + return false; } - private int getRestrictedMaxEntriesForConsumer(Consumer consumer, List entries, - int availablePermits, ReadType readType, Set stickyKeyHashes) { - int maxMessages = Math.min(entries.size(), availablePermits); - if (maxMessages == 0) { - return 0; + private boolean isReplayQueueSizeBelowLimit() { + return redeliveryMessages.size() < getEffectiveLookAheadLimit(); + } + + private int getEffectiveLookAheadLimit() { + return getEffectiveLookAheadLimit(serviceConfig, consumerList.size()); + } + + static int getEffectiveLookAheadLimit(ServiceConfiguration serviceConfig, int consumerCount) { + int perConsumerLimit = serviceConfig.getKeySharedLookAheadMsgInReplayThresholdPerConsumer(); + int perSubscriptionLimit = serviceConfig.getKeySharedLookAheadMsgInReplayThresholdPerSubscription(); + int effectiveLimit; + if (perConsumerLimit <= 0) { + effectiveLimit = perSubscriptionLimit; + } else { + effectiveLimit = perConsumerLimit * consumerCount; + if (perSubscriptionLimit > 0 && perSubscriptionLimit < effectiveLimit) { + effectiveLimit = perSubscriptionLimit; + } } - if (readType == ReadType.Normal && stickyKeyHashes != null - && redeliveryMessages.containsStickyKeyHashes(stickyKeyHashes)) { - // If redeliveryMessages contains messages that correspond to the same hash as the messages - // that the dispatcher is trying to send, do not send those messages for order guarantee - return 0; + if (effectiveLimit <= 0) { + // use max unacked messages limits if key shared look-ahead limits are disabled + int maxUnackedMessagesPerSubscription = serviceConfig.getMaxUnackedMessagesPerSubscription(); + if (maxUnackedMessagesPerSubscription <= 0) { + maxUnackedMessagesPerSubscription = Integer.MAX_VALUE; + } + int maxUnackedMessagesByConsumers = consumerCount * serviceConfig.getMaxUnackedMessagesPerConsumer(); + if (maxUnackedMessagesByConsumers <= 0) { + maxUnackedMessagesByConsumers = Integer.MAX_VALUE; + } + effectiveLimit = Math.min(maxUnackedMessagesPerSubscription, maxUnackedMessagesByConsumers); } + return effectiveLimit; + } + + // groups the entries by consumer and filters out the entries that should not be dispatched + // the entries are handled in the order they are received instead of first grouping them by consumer and + // then filtering them + private Map> filterAndGroupEntriesForDispatching(List entries, ReadType readType, + MutableBoolean triggerLookAhead) { + // entries grouped by consumer + Map> entriesGroupedByConsumer = new HashMap<>(); + // permits for consumer, permits are for entries/batches + Map permitsForConsumer = new HashMap<>(); + // maxLastSentPosition cache for consumers, used when recently joined consumers exist + boolean hasRecentlyJoinedConsumers = hasRecentlyJoinedConsumers(); + Map maxLastSentPositionCache = hasRecentlyJoinedConsumers ? new HashMap<>() : null; + boolean lookAheadAllowed = isReplayQueueSizeBelowLimit(); + // in normal read mode, keep track of consumers that are blocked by hash, to check if look-ahead could be useful + Set blockedByHashConsumers = lookAheadAllowed && readType == ReadType.Normal ? new HashSet<>() : null; + // in replay read mode, keep track of consumers for entries, used for look-ahead check + Set consumersForEntriesForLookaheadCheck = lookAheadAllowed ? new HashSet<>() : null; + + for (Entry entry : entries) { + int stickyKeyHash = getStickyKeyHash(entry); + Consumer consumer = selector.select(stickyKeyHash); + MutableBoolean blockedByHash = null; + boolean dispatchEntry = false; + if (consumer != null) { + if (lookAheadAllowed) { + consumersForEntriesForLookaheadCheck.add(consumer); + } + Position maxLastSentPosition = hasRecentlyJoinedConsumers ? maxLastSentPositionCache.computeIfAbsent( + consumer, __ -> resolveMaxLastSentPositionForRecentlyJoinedConsumer(consumer, readType)) : null; + blockedByHash = lookAheadAllowed && readType == ReadType.Normal ? new MutableBoolean(false) : null; + MutableInt permits = + permitsForConsumer.computeIfAbsent(consumer, + k -> new MutableInt(getAvailablePermits(consumer))); + // a consumer was found for the sticky key hash and the entry can be dispatched + if (permits.intValue() > 0 && canDispatchEntry(entry, readType, stickyKeyHash, + maxLastSentPosition, blockedByHash)) { + // decrement the permits for the consumer + permits.decrement(); + // allow the entry to be dispatched + dispatchEntry = true; + } + } + if (dispatchEntry) { + // add the entry to consumer's entry list for dispatching + List consumerEntries = + entriesGroupedByConsumer.computeIfAbsent(consumer, k -> new ArrayList<>()); + consumerEntries.add(entry); + } else { + if (blockedByHash != null && blockedByHash.isTrue()) { + // the entry is blocked by hash, add the consumer to the blocked set + blockedByHashConsumers.add(consumer); + } + // add the message to replay + addMessageToReplay(entry.getLedgerId(), entry.getEntryId(), stickyKeyHash); + // release the entry as it will not be dispatched + entry.release(); + } + } + // + // determine whether look-ahead could be useful for making more progress + // + if (lookAheadAllowed && entriesGroupedByConsumer.isEmpty()) { + // check if look-ahead could be useful for the consumers that are blocked by a hash that is in the replay + // queue. This check applies only to the normal read mode. + if (readType == ReadType.Normal) { + for (Consumer consumer : blockedByHashConsumers) { + // if the consumer isn't in the entriesGroupedByConsumer, it means that it won't receive any + // messages + // if it has available permits, then look-ahead could be useful for this particular consumer + // to make further progress + if (!entriesGroupedByConsumer.containsKey(consumer) + && permitsForConsumer.get(consumer).intValue() > 0) { + triggerLookAhead.setTrue(); + break; + } + } + } + // check if look-ahead could be useful for other consumers + if (!triggerLookAhead.booleanValue()) { + for (Consumer consumer : getConsumers()) { + // filter out the consumers that are already checked when the entries were processed for entries + if (!consumersForEntriesForLookaheadCheck.contains(consumer)) { + // if another consumer has available permits, then look-ahead could be useful + if (getAvailablePermits(consumer) > 0) { + triggerLookAhead.setTrue(); + break; + } + } + } + } + } + return entriesGroupedByConsumer; + } + + // checks if the entry can be dispatched to the consumer + private boolean canDispatchEntry(Entry entry, + ReadType readType, int stickyKeyHash, Position maxLastSentPosition, + MutableBoolean blockedByHash) { + // check if the entry can be replayed to a recently joined consumer + if (maxLastSentPosition != null && entry.getPosition().compareTo(maxLastSentPosition) > 0) { + return false; + } + + // If redeliveryMessages contains messages that correspond to the same hash as the entry to be dispatched + // do not send those messages for order guarantee + if (readType == ReadType.Normal && redeliveryMessages.containsStickyKeyHash(stickyKeyHash)) { + if (blockedByHash != null) { + blockedByHash.setTrue(); + } + return false; + } + + return true; + } + + /** + * Creates a filter for replaying messages. The filter is stateful and shouldn't be cached or reused. + * @see PersistentDispatcherMultipleConsumers#createFilterForReplay() + */ + @Override + protected Predicate createFilterForReplay() { + return new ReplayPositionFilter(); + } + + /** + * Filter for replaying messages. The filter is stateful for a single invocation and shouldn't be cached, shared + * or reused. This is a short-lived object, and optimizing it for the "no garbage" coding style of Pulsar is + * unnecessary since the JVM can optimize allocations for short-lived objects. + */ + private class ReplayPositionFilter implements Predicate { + // tracks the available permits for each consumer for the duration of the filter usage + // the filter is stateful and shouldn't be shared or reused later + private final Map availablePermitsMap = new HashMap<>(); + private final Map maxLastSentPositionCache = + hasRecentlyJoinedConsumers() ? new HashMap<>() : null; + + @Override + public boolean test(Position position) { + // if out of order delivery is allowed, then any position will be replayed + if (isAllowOutOfOrderDelivery()) { + return true; + } + // lookup the sticky key hash for the entry at the replay position + Long stickyKeyHash = redeliveryMessages.getHash(position.getLedgerId(), position.getEntryId()); + if (stickyKeyHash == null) { + // the sticky key hash is missing for delayed messages, the filtering will happen at the time of + // dispatch after reading the entry from the ledger + if (log.isDebugEnabled()) { + log.debug("[{}] replay of entry at position {} doesn't contain sticky key hash.", name, position); + } + return true; + } + // find the consumer for the sticky key hash + Consumer consumer = selector.select(stickyKeyHash.intValue()); + // skip replaying the message position if there's no assigned consumer + if (consumer == null) { + return false; + } + // lookup the available permits for the consumer + MutableInt availablePermits = + availablePermitsMap.computeIfAbsent(consumer, + k -> new MutableInt(getAvailablePermits(consumer))); + // skip replaying the message position if the consumer has no available permits + if (availablePermits.intValue() <= 0) { + return false; + } + // check if the entry position can be replayed to a recently joined consumer + Position maxLastSentPosition = maxLastSentPositionCache != null + ? maxLastSentPositionCache.computeIfAbsent(consumer, __ -> + resolveMaxLastSentPositionForRecentlyJoinedConsumer(consumer, ReadType.Replay)) + : null; + if (maxLastSentPosition != null && position.compareTo(maxLastSentPosition) > 0) { + return false; + } + availablePermits.decrement(); + return true; + } + } + + /** + * Contains the logic to resolve the max last sent position for a consumer + * when the consumer has recently joined. This is only applicable for key shared mode when + * allowOutOfOrderDelivery=false. + */ + private Position resolveMaxLastSentPositionForRecentlyJoinedConsumer(Consumer consumer, ReadType readType) { if (recentlyJoinedConsumers == null) { - return maxMessages; + return null; } removeConsumersFromRecentJoinedConsumers(); Position maxLastSentPosition = recentlyJoinedConsumers.get(consumer); @@ -468,7 +616,7 @@ private int getRestrictedMaxEntriesForConsumer(Consumer consumer, List 0) { - // We have already crossed the divider line. All messages in the list are now - // newer than what we can currently dispatch to this consumer - return i; - } - } - return maxMessages; + return maxLastSentPosition; } + @Override public void markDeletePositionMoveForward() { // Execute the notification in different thread to avoid a mutex chain here // from the delete operation that was completed topic.getBrokerService().getTopicOrderedExecutor().execute(() -> { synchronized (PersistentStickyKeyDispatcherMultipleConsumers.this) { - if (recentlyJoinedConsumers != null && !recentlyJoinedConsumers.isEmpty() + if (hasRecentlyJoinedConsumers() && removeConsumersFromRecentJoinedConsumers()) { // After we process acks, we need to check whether the mark-delete position was advanced and we // can finally read more messages. It's safe to call readMoreEntries() multiple times. @@ -520,6 +660,10 @@ && removeConsumersFromRecentJoinedConsumers()) { }); } + private boolean hasRecentlyJoinedConsumers() { + return !MapUtils.isEmpty(recentlyJoinedConsumers); + } + private boolean removeConsumersFromRecentJoinedConsumers() { if (MapUtils.isEmpty(recentlyJoinedConsumers)) { return false; @@ -553,99 +697,104 @@ private synchronized Position updateIfNeededAndGetLastSentPosition() { return lastSentPosition; } + /** + * The dispatcher will skip replaying messages when all messages in the replay queue are filtered out when + * skipNextReplayToTriggerLookAhead=true. The flag gets resetted after the call. + * + * If we're stuck on replay, we want to move forward reading on the topic (until the configured look ahead + * limits kick in), instead of keep replaying the same old messages, since the consumer that these + * messages are routing to might be busy at the moment. + * + * Please see {@link ServiceConfiguration#getKeySharedLookAheadMsgInReplayThresholdPerConsumer} and + * {@link ServiceConfiguration#getKeySharedLookAheadMsgInReplayThresholdPerSubscription} for configuring the limits. + */ @Override - protected synchronized NavigableSet getMessagesToReplayNow(int maxMessagesToRead) { - if (isDispatcherStuckOnReplays) { - // If we're stuck on replay, we want to move forward reading on the topic (until the overall max-unacked - // messages kicks in), instead of keep replaying the same old messages, since the consumer that these - // messages are routing to might be busy at the moment - this.isDispatcherStuckOnReplays = false; - return Collections.emptyNavigableSet(); - } else { - return super.getMessagesToReplayNow(maxMessagesToRead); + protected synchronized boolean canReplayMessages() { + if (skipNextReplayToTriggerLookAhead) { + skipNextReplayToTriggerLookAhead = false; + return false; } + return true; } private int getAvailablePermits(Consumer c) { + // skip consumers that are currently closing + if (!c.cnx().isActive()) { + return 0; + } int availablePermits = Math.max(c.getAvailablePermits(), 0); - if (c.getMaxUnackedMessages() > 0) { - // Avoid negative number - int remainUnAckedMessages = Math.max(c.getMaxUnackedMessages() - c.getUnackedMessages(), 0); - availablePermits = Math.min(availablePermits, remainUnAckedMessages); + if (availablePermits > 0 && c.getMaxUnackedMessages() > 0) { + // Calculate the maximum number of additional unacked messages allowed + int maxAdditionalUnackedMessages = Math.max(c.getMaxUnackedMessages() - c.getUnackedMessages(), 0); + if (maxAdditionalUnackedMessages == 0) { + // if the consumer has reached the max unacked messages, then no more messages can be dispatched + return 0; + } + // Estimate the remaining permits based on the average messages per entry + // add "avgMessagesPerEntry - 1" to round up the division to the next integer without the need to use + // floating point arithmetic + int avgMessagesPerEntry = Math.max(c.getAvgMessagesPerEntry(), 1); + int estimatedRemainingPermits = + (maxAdditionalUnackedMessages + avgMessagesPerEntry - 1) / avgMessagesPerEntry; + // return the minimum of current available permits and estimated remaining permits + return Math.min(availablePermits, estimatedRemainingPermits); + } else { + return availablePermits; } - return availablePermits; } + /** + * For Key_Shared subscription, the dispatcher will not read more entries while there are pending reads + * or pending replay reads. + * @return true if there are no pending reads or pending replay reads + */ @Override - protected synchronized NavigableSet filterOutEntriesWillBeDiscarded(NavigableSet src) { - // The variable "hashesToBeBlocked" and "recentlyJoinedConsumers" will be null if "isAllowOutOfOrderDelivery()", - // So skip this filter out. - if (isAllowOutOfOrderDelivery()) { - return src; - } - if (src.isEmpty()) { - return src; - } - NavigableSet res = new TreeSet<>(); - // Group positions. - final Map> groupedPositions = localGroupedPositions.get(); - groupedPositions.clear(); - for (Position pos : src) { - Long stickyKeyHash = redeliveryMessages.getHash(pos.getLedgerId(), pos.getEntryId()); - if (stickyKeyHash == null) { - res.add(pos); - continue; - } - Consumer c = selector.select(stickyKeyHash.intValue()); - if (c == null) { - // Maybe using HashRangeExclusiveStickyKeyConsumerSelector. - continue; - } - groupedPositions.computeIfAbsent(c, k -> new ArrayList<>()).add(pos); - } - // Filter positions by the Recently Joined Position rule. - for (Map.Entry> item : groupedPositions.entrySet()) { - int availablePermits = getAvailablePermits(item.getKey()); - if (availablePermits == 0) { - continue; - } - int posCountToRead = getRestrictedMaxEntriesForConsumer(item.getKey(), item.getValue(), availablePermits, - ReadType.Replay, null); - if (posCountToRead > 0) { - res.addAll(item.getValue().subList(0, posCountToRead)); - } - } - return res; + protected boolean doesntHavePendingRead() { + return !havePendingRead && !havePendingReplayRead; } /** - * In Key_Shared mode, the consumer will not receive any entries from a normal reading if it is included in - * {@link #recentlyJoinedConsumers}, they can only receive entries from replay reads. - * If all entries in {@link #redeliveryMessages} have been filtered out due to the order guarantee mechanism, - * Broker need a normal read to make the consumers not included in @link #recentlyJoinedConsumers} will not be - * stuck. See https://github.com/apache/pulsar/pull/7105. + * For Key_Shared subscription, the dispatcher will not attempt to read more entries if the replay queue size + * has reached the limit or if there are no consumers with permits. */ @Override - protected boolean hasConsumersNeededNormalRead() { - // The variable "hashesToBeBlocked" and "recentlyJoinedConsumers" will be null if "isAllowOutOfOrderDelivery()", - // So the method "filterOutEntriesWillBeDiscarded" will filter out nothing, just return "true" here. - if (isAllowOutOfOrderDelivery()) { - return true; + protected boolean isNormalReadAllowed() { + // don't allow reading more if the replay queue size has reached the limit + if (!isReplayQueueSizeBelowLimit()) { + return false; } for (Consumer consumer : consumerList) { + // skip blocked consumers if (consumer == null || consumer.isBlocked()) { continue; } - if (recentlyJoinedConsumers.containsKey(consumer)) { - continue; - } - if (consumer.getAvailablePermits() > 0) { + // before reading more, check that there's at least one consumer that has permits + if (getAvailablePermits(consumer) > 0) { return true; } } return false; } + @Override + protected int getMaxEntriesReadLimit() { + // prevent the redelivery queue from growing over the limit by limiting the number of entries to read + // to the maximum number of entries that can be added to the redelivery queue + return Math.max(getEffectiveLookAheadLimit() - redeliveryMessages.size(), 1); + } + + /** + * When a normal read is not allowed, the dispatcher will reschedule a read with a backoff. + */ + @Override + protected void handleNormalReadNotAllowed() { + if (log.isDebugEnabled()) { + log.debug("[{}] [{}] Skipping read for the topic since normal read isn't allowed. " + + "Rescheduling a read with a backoff.", topic.getName(), getSubscriptionName()); + } + reScheduleReadWithBackoff(); + } + @Override public SubType getType() { return SubType.Key_Shared; @@ -702,5 +851,4 @@ public Map> getConsumerKeyHashRanges() { } private static final Logger log = LoggerFactory.getLogger(PersistentStickyKeyDispatcherMultipleConsumers.class); - } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/utils/ConcurrentBitmapSortedLongPairSet.java b/pulsar-broker/src/main/java/org/apache/pulsar/utils/ConcurrentBitmapSortedLongPairSet.java index e42cae2580b78..cc1eae475fa2d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/utils/ConcurrentBitmapSortedLongPairSet.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/utils/ConcurrentBitmapSortedLongPairSet.java @@ -22,10 +22,12 @@ import java.util.Map; import java.util.NavigableMap; import java.util.NavigableSet; +import java.util.Optional; import java.util.TreeMap; import java.util.TreeSet; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.apache.commons.lang3.mutable.MutableObject; import org.apache.pulsar.common.util.collections.LongPairSet; import org.roaringbitmap.RoaringBitmap; @@ -93,25 +95,51 @@ public void removeUpTo(long item1, long item2) { } } + public > Optional first(LongPairSet.LongPairFunction longPairConverter) { + MutableObject> result = new MutableObject<>(Optional.empty()); + processItems(longPairConverter, item -> { + result.setValue(Optional.of(item)); + return false; + }); + return result.getValue(); + } public > NavigableSet items(int numberOfItems, LongPairSet.LongPairFunction longPairConverter) { NavigableSet items = new TreeSet<>(); + processItems(longPairConverter, item -> { + items.add(item); + return items.size() < numberOfItems; + }); + return items; + } + + public interface ItemProcessor> { + /** + * @param item + * @return false if there is no further processing required + */ + boolean process(T item); + } + + public > void processItems(LongPairSet.LongPairFunction longPairConverter, + ItemProcessor itemProcessor) { lock.readLock().lock(); try { for (Map.Entry entry : map.entrySet()) { Iterator iterator = entry.getValue().stream().iterator(); - while (iterator.hasNext() && items.size() < numberOfItems) { - items.add(longPairConverter.apply(entry.getKey(), iterator.next())); + boolean continueProcessing = true; + while (continueProcessing && iterator.hasNext()) { + T item = longPairConverter.apply(entry.getKey(), iterator.next()); + continueProcessing = itemProcessor.process(item); } - if (items.size() == numberOfItems) { + if (!continueProcessing) { break; } } } finally { lock.readLock().unlock(); } - return items; } public boolean isEmpty() { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerTestUtil.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerTestUtil.java index bfb172d0711d4..5641816ee0b80 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerTestUtil.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerTestUtil.java @@ -18,8 +18,31 @@ */ package org.apache.pulsar.broker; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.ObjectWriter; +import java.io.IOException; +import java.io.StringWriter; +import java.io.UncheckedIOException; +import java.time.Duration; +import java.util.Arrays; import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiConsumer; +import java.util.function.BiFunction; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.common.util.FutureUtil; +import org.apache.pulsar.common.util.ObjectMapperFactory; import org.mockito.Mockito; +import org.slf4j.Logger; /** * Holds util methods used in test. @@ -77,4 +100,138 @@ public static T spyWithoutRecordingInvocations(T object) { .defaultAnswer(Mockito.CALLS_REAL_METHODS) .stubOnly()); } + + /** + * Uses Jackson to create a JSON string for the given object + * @param object to convert to JSON + * @return JSON string + */ + public static String toJson(Object object) { + ObjectWriter writer = ObjectMapperFactory.getMapper().writer(); + StringWriter stringWriter = new StringWriter(); + try (JsonGenerator generator = writer.createGenerator(stringWriter).useDefaultPrettyPrinter()) { + generator.writeObject(object); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + return stringWriter.toString(); + } + + /** + * Logs the topic stats and internal stats for the given topic + * @param logger logger to use + * @param pulsarAdmin PulsarAdmin client to use + * @param topic topic name + */ + public static void logTopicStats(Logger logger, PulsarAdmin pulsarAdmin, String topic) { + try { + logger.info("[{}] stats: {}", topic, toJson(pulsarAdmin.topics().getStats(topic))); + logger.info("[{}] internalStats: {}", topic, + toJson(pulsarAdmin.topics().getInternalStats(topic, true))); + } catch (PulsarAdminException e) { + logger.warn("Failed to get stats for topic {}", topic, e); + } + } + + /** + * Receive messages concurrently from multiple consumers and handles them using the provided message handler. + * The message handler should return true if it wants to continue receiving more messages, false otherwise. + * + * @param messageHandler the message handler + * @param quietTimeout the duration of quiet time after which the method will stop waiting for more messages + * @param consumers the consumers to receive messages from + * @param the message value type + */ + public static void receiveMessages(BiFunction, Message, Boolean> messageHandler, + Duration quietTimeout, + Consumer... consumers) { + FutureUtil.waitForAll(Arrays.stream(consumers) + .map(consumer -> receiveMessagesAsync(consumer, quietTimeout, messageHandler)).toList()).join(); + } + + // asynchronously receive messages from a consumer and handle them using the provided message handler + // the benefit is that multiple consumers can be concurrently consumed without the need to have multiple threads + // this is useful in tests where multiple consumers are needed to test the functionality + private static CompletableFuture receiveMessagesAsync(Consumer consumer, Duration quietTimeout, + BiFunction, Message, Boolean> + messageHandler) { + CompletableFuture> receiveFuture = consumer.receiveAsync(); + return receiveFuture + .orTimeout(quietTimeout.toMillis(), TimeUnit.MILLISECONDS) + .handle((msg, t) -> { + if (t != null) { + if (t instanceof TimeoutException) { + // cancel the receive future so that Pulsar client can clean up the resources + receiveFuture.cancel(false); + return false; + } else { + throw FutureUtil.wrapToCompletionException(t); + } + } + return messageHandler.apply(consumer, msg); + }).thenComposeAsync(receiveMore -> { + if (receiveMore) { + return receiveMessagesAsync(consumer, quietTimeout, messageHandler); + } else { + return CompletableFuture.completedFuture(null); + } + }); + } + + /** + * Receive messages concurrently from multiple consumers and handles them using the provided message handler. + * The messages are received until the quiet timeout is reached or the maximum number of messages is received. + * + * @param messageHandler the message handler + * @param quietTimeout the duration of quiet time after which the method will stop waiting for more messages + * @param maxMessages the maximum number of messages to receive + * @param consumers the consumers to receive messages from + * @param the message value type + */ + public static void receiveMessagesN(BiConsumer, Message> messageHandler, + Duration quietTimeout, + int maxMessages, + Consumer... consumers) + throws ExecutionException, InterruptedException { + AtomicInteger messagesReceived = new AtomicInteger(); + receiveMessages( + (consumer, message) -> { + messageHandler.accept(consumer, message); + return messagesReceived.incrementAndGet() < maxMessages; + }, quietTimeout, consumers); + } + + /** + * Receive messages concurrently from multiple consumers and handles them using the provided message handler. + * + * @param messageHandler the message handler + * @param quietTimeout the duration of quiet time after which the method will stop waiting for more messages + * @param consumers the consumers to receive messages from + * @param the message value type + */ + public static void receiveMessagesInThreads(BiFunction, Message, Boolean> messageHandler, + final Duration quietTimeout, + Consumer... consumers) { + FutureUtil.waitForAll(Arrays.stream(consumers).sequential().map(consumer -> { + return CompletableFuture.runAsync(() -> { + try { + while (!Thread.currentThread().isInterrupted()) { + Message msg = consumer.receive((int) quietTimeout.toMillis(), TimeUnit.MILLISECONDS); + if (msg != null) { + if (!messageHandler.apply(consumer, msg)) { + break; + } + } else { + break; + } + } + } catch (PulsarClientException e) { + throw new CompletionException(e); + } + }, runnable -> { + Thread thread = new Thread(runnable, "Consumer-" + consumer.getConsumerName()); + thread.start(); + }); + }).toList()).join(); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java index 1cc20b04c2137..e32af29c7e962 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java @@ -626,21 +626,23 @@ public void testGetTransactionBufferInternalStats() throws Exception { producer.newMessage(transaction).send(); transaction.abort().get(); - // Get transaction buffer internal stats and verify single snapshot stats - TransactionBufferInternalStats stats = admin.transactions() - .getTransactionBufferInternalStatsAsync(topic2, true).get(); - assertEquals(stats.snapshotType, AbortedTxnProcessor.SnapshotType.Single.toString()); - assertNotNull(stats.singleSnapshotSystemTopicInternalStats); - - // Get managed ledger internal stats for the transaction buffer snapshot topic - PersistentTopicInternalStats internalStats = admin.topics().getInternalStats( - TopicName.get(topic2).getNamespace() + "/" + SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT); - verifyManagedLedgerInternalStats(stats.singleSnapshotSystemTopicInternalStats.managedLedgerInternalStats, - internalStats); - assertTrue(stats.singleSnapshotSystemTopicInternalStats.managedLedgerName - .contains(SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT)); - assertNull(stats.segmentInternalStats); - assertNull(stats.segmentIndexInternalStats); + Awaitility.await().untilAsserted(() -> { + // Get transaction buffer internal stats and verify single snapshot stats + TransactionBufferInternalStats stats = admin.transactions() + .getTransactionBufferInternalStatsAsync(topic2, true).get(); + assertEquals(stats.snapshotType, AbortedTxnProcessor.SnapshotType.Single.toString()); + assertNotNull(stats.singleSnapshotSystemTopicInternalStats); + + // Get managed ledger internal stats for the transaction buffer snapshot topic + PersistentTopicInternalStats internalStats = admin.topics().getInternalStats( + TopicName.get(topic2).getNamespace() + "/" + SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT); + verifyManagedLedgerInternalStats(stats.singleSnapshotSystemTopicInternalStats.managedLedgerInternalStats, + internalStats); + assertTrue(stats.singleSnapshotSystemTopicInternalStats.managedLedgerName + .contains(SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT)); + assertNull(stats.segmentInternalStats); + assertNull(stats.segmentIndexInternalStats); + }); // Configure segmented snapshot and set segment size pulsar.getConfig().setTransactionBufferSnapshotSegmentSize(9); @@ -652,28 +654,31 @@ public void testGetTransactionBufferInternalStats() throws Exception { producer.newMessage(transaction).send(); transaction.abort().get(); - // Get transaction buffer internal stats and verify segmented snapshot stats - stats = admin.transactions().getTransactionBufferInternalStatsAsync(topic3, true).get(); - assertEquals(stats.snapshotType, AbortedTxnProcessor.SnapshotType.Segment.toString()); - assertNull(stats.singleSnapshotSystemTopicInternalStats); - assertNotNull(stats.segmentInternalStats); - - // Get managed ledger internal stats for the transaction buffer segments topic - internalStats = admin.topics().getInternalStats( - TopicName.get(topic2).getNamespace() + "/" + - SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT_SEGMENTS); - verifyManagedLedgerInternalStats(stats.segmentInternalStats.managedLedgerInternalStats, internalStats); - assertTrue(stats.segmentInternalStats.managedLedgerName - .contains(SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT_SEGMENTS)); - - // Get managed ledger internal stats for the transaction buffer indexes topic - assertNotNull(stats.segmentIndexInternalStats); - internalStats = admin.topics().getInternalStats( - TopicName.get(topic2).getNamespace() + "/" + - SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT_INDEXES); - verifyManagedLedgerInternalStats(stats.segmentIndexInternalStats.managedLedgerInternalStats, internalStats); - assertTrue(stats.segmentIndexInternalStats.managedLedgerName - .contains(SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT_INDEXES)); + Awaitility.await().untilAsserted(() -> { + // Get transaction buffer internal stats and verify segmented snapshot stats + TransactionBufferInternalStats stats = + admin.transactions().getTransactionBufferInternalStatsAsync(topic3, true).get(); + assertEquals(stats.snapshotType, AbortedTxnProcessor.SnapshotType.Segment.toString()); + assertNull(stats.singleSnapshotSystemTopicInternalStats); + assertNotNull(stats.segmentInternalStats); + + // Get managed ledger internal stats for the transaction buffer segments topic + PersistentTopicInternalStats internalStats = admin.topics().getInternalStats( + TopicName.get(topic2).getNamespace() + "/" + + SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT_SEGMENTS); + verifyManagedLedgerInternalStats(stats.segmentInternalStats.managedLedgerInternalStats, internalStats); + assertTrue(stats.segmentInternalStats.managedLedgerName + .contains(SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT_SEGMENTS)); + + // Get managed ledger internal stats for the transaction buffer indexes topic + assertNotNull(stats.segmentIndexInternalStats); + internalStats = admin.topics().getInternalStats( + TopicName.get(topic2).getNamespace() + "/" + + SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT_INDEXES); + verifyManagedLedgerInternalStats(stats.segmentIndexInternalStats.managedLedgerInternalStats, internalStats); + assertTrue(stats.segmentIndexInternalStats.managedLedgerName + .contains(SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT_INDEXES)); + }); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java index c83888b8022b3..8dd2fc1c3c26d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java @@ -45,6 +45,7 @@ import javax.ws.rs.container.TimeoutHandler; import lombok.AllArgsConstructor; import lombok.Data; +import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.authentication.AuthenticationProviderTls; @@ -759,5 +760,9 @@ protected void assertOtelMetricLongSumValue(String metricName, int value) { sum -> sum.hasPointsSatisfying(point -> point.hasValue(value)))); } + protected void logTopicStats(String topic) { + BrokerTestUtil.logTopicStats(log, admin, topic); + } + private static final Logger log = LoggerFactory.getLogger(MockedPulsarServiceBaseTest.class); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/KeySharedLookAheadConfigTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/KeySharedLookAheadConfigTest.java new file mode 100644 index 0000000000000..cf028cf369d7b --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/KeySharedLookAheadConfigTest.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service.persistent; + +import static org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumers.getEffectiveLookAheadLimit; +import static org.testng.Assert.assertEquals; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.testng.annotations.Test; + +public class KeySharedLookAheadConfigTest { + + @Test + public void testGetEffectiveLookAheadLimit() { + ServiceConfiguration config = new ServiceConfiguration(); + + config.setKeySharedLookAheadMsgInReplayThresholdPerConsumer(5); + config.setKeySharedLookAheadMsgInReplayThresholdPerSubscription(100); + assertEquals(getEffectiveLookAheadLimit(config, 5), 25); + assertEquals(getEffectiveLookAheadLimit(config, 100), 100); + + config.setKeySharedLookAheadMsgInReplayThresholdPerConsumer(5); + config.setKeySharedLookAheadMsgInReplayThresholdPerSubscription(0); + assertEquals(getEffectiveLookAheadLimit(config, 100), 500); + + config.setKeySharedLookAheadMsgInReplayThresholdPerConsumer(0); + config.setKeySharedLookAheadMsgInReplayThresholdPerSubscription(6000); + assertEquals(getEffectiveLookAheadLimit(config, 100), 6000); + + config.setKeySharedLookAheadMsgInReplayThresholdPerConsumer(0); + config.setKeySharedLookAheadMsgInReplayThresholdPerSubscription(0); + config.setMaxUnackedMessagesPerConsumer(0); + config.setMaxUnackedMessagesPerSubscription(0); + assertEquals(getEffectiveLookAheadLimit(config, 100), Integer.MAX_VALUE); + + config.setKeySharedLookAheadMsgInReplayThresholdPerConsumer(0); + config.setKeySharedLookAheadMsgInReplayThresholdPerSubscription(0); + config.setMaxUnackedMessagesPerConsumer(1); + config.setMaxUnackedMessagesPerSubscription(10); + assertEquals(getEffectiveLookAheadLimit(config, 100), 10); + + config.setKeySharedLookAheadMsgInReplayThresholdPerConsumer(0); + config.setKeySharedLookAheadMsgInReplayThresholdPerSubscription(0); + config.setMaxUnackedMessagesPerConsumer(22); + config.setMaxUnackedMessagesPerSubscription(0); + assertEquals(getEffectiveLookAheadLimit(config, 100), 2200); + } +} \ No newline at end of file diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryControllerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryControllerTest.java index 2222c8156e011..1708dc7bc2536 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryControllerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryControllerTest.java @@ -225,12 +225,12 @@ public void testGetMessagesToReplayNow(boolean allowOutOfOrderDelivery) throws E if (allowOutOfOrderDelivery) { // The entries are sorted by ledger ID but not by entry ID - Position[] actual1 = controller.getMessagesToReplayNow(3).toArray(new Position[3]); + Position[] actual1 = controller.getMessagesToReplayNow(3, item -> true).toArray(new Position[3]); Position[] expected1 = { PositionFactory.create(1, 1), PositionFactory.create(1, 2), PositionFactory.create(1, 3) }; assertEqualsNoOrder(actual1, expected1); } else { // The entries are completely sorted - Set actual2 = controller.getMessagesToReplayNow(6); + Set actual2 = controller.getMessagesToReplayNow(6, item -> true); Set expected2 = new TreeSet<>(); expected2.add(PositionFactory.create(1, 1)); expected2.add(PositionFactory.create(1, 2)); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java index a7ff9eb9c11f2..b78d1e554c32d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java @@ -74,6 +74,7 @@ import org.apache.pulsar.broker.service.EntryBatchSizes; import org.apache.pulsar.broker.service.RedeliveryTracker; import org.apache.pulsar.broker.service.StickyKeyConsumerSelector; +import org.apache.pulsar.broker.service.TransportCnx; import org.apache.pulsar.broker.service.plugin.EntryFilterProvider; import org.apache.pulsar.common.api.proto.KeySharedMeta; import org.apache.pulsar.common.api.proto.KeySharedMode; @@ -120,7 +121,7 @@ public void setup() throws Exception { doReturn(100).when(configMock).getDispatcherMaxReadBatchSize(); doReturn(true).when(configMock).isSubscriptionKeySharedUseConsistentHashing(); doReturn(1).when(configMock).getSubscriptionKeySharedConsistentHashingReplicaPoints(); - doReturn(true).when(configMock).isDispatcherDispatchMessagesInSubscriptionThread(); + doReturn(false).when(configMock).isDispatcherDispatchMessagesInSubscriptionThread(); doReturn(false).when(configMock).isAllowOverrideEntryFilters(); doAnswer(invocation -> retryBackoffInitialTimeInMs).when(configMock).getDispatcherRetryBackoffInitialTimeInMs(); doAnswer(invocation -> retryBackoffMaxTimeInMs).when(configMock).getDispatcherRetryBackoffMaxTimeInMs(); @@ -191,7 +192,7 @@ public void setup() throws Exception { doReturn(subscriptionName).when(cursorMock).getName(); doReturn(ledgerMock).when(cursorMock).getManagedLedger(); - consumerMock = mock(Consumer.class); + consumerMock = createMockConsumer(); channelMock = mock(ChannelPromise.class); doReturn("consumer1").when(consumerMock).consumerName(); consumerMockAvailablePermits = new AtomicInteger(1000); @@ -214,6 +215,14 @@ public void setup() throws Exception { new KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT)); } + protected static Consumer createMockConsumer() { + Consumer consumerMock = mock(Consumer.class); + TransportCnx transportCnx = mock(TransportCnx.class); + doReturn(transportCnx).when(consumerMock).cnx(); + doReturn(true).when(transportCnx).isActive(); + return consumerMock; + } + @AfterMethod(alwaysRun = true) public void cleanup() { if (persistentDispatcher != null && !persistentDispatcher.isClosed()) { @@ -228,7 +237,7 @@ public void cleanup() { @Test(timeOut = 10000) public void testAddConsumerWhenClosed() throws Exception { persistentDispatcher.close().get(); - Consumer consumer = mock(Consumer.class); + Consumer consumer = createMockConsumer(); persistentDispatcher.addConsumer(consumer); verify(consumer, times(1)).disconnect(); assertEquals(0, persistentDispatcher.getConsumers().size()); @@ -286,7 +295,7 @@ public void testSendMessage() { .setStart(0) .setEnd(9); - Consumer consumerMock = mock(Consumer.class); + Consumer consumerMock = createMockConsumer(); doReturn(keySharedMeta).when(consumerMock).getKeySharedMeta(); persistentDispatcher.addConsumer(consumerMock); persistentDispatcher.consumerFlow(consumerMock, 1000); @@ -308,7 +317,7 @@ public void testSendMessage() { @Test public void testSkipRedeliverTemporally() { - final Consumer slowConsumerMock = mock(Consumer.class); + final Consumer slowConsumerMock = createMockConsumer(); final ChannelPromise slowChannelMock = mock(ChannelPromise.class); // add entries to redeliver and read target final List redeliverEntries = new ArrayList<>(); @@ -336,7 +345,6 @@ public void testSkipRedeliverTemporally() { // Create 2Consumers try { doReturn("consumer2").when(slowConsumerMock).consumerName(); - // Change slowConsumer availablePermits to 0 and back to normal when(slowConsumerMock.getAvailablePermits()) .thenReturn(0) .thenReturn(1); @@ -362,28 +370,24 @@ public void testSkipRedeliverTemporally() { // Change slowConsumer availablePermits to 1 // run PersistentStickyKeyDispatcherMultipleConsumers#sendMessagesToConsumers internally // and then stop to dispatch to slowConsumer - if (persistentDispatcher.sendMessagesToConsumers(PersistentStickyKeyDispatcherMultipleConsumers.ReadType.Normal, - redeliverEntries, true)) { - persistentDispatcher.readMoreEntriesAsync(); - } - - Awaitility.await().untilAsserted(() -> { - verify(consumerMock, times(1)).sendMessages( - argThat(arg -> { - assertEquals(arg.size(), 1); - Entry entry = arg.get(0); - assertEquals(entry.getLedgerId(), 1); - assertEquals(entry.getEntryId(), 3); - return true; - }), - any(EntryBatchSizes.class), - any(EntryBatchIndexesAcks.class), - anyInt(), - anyLong(), - anyLong(), - any(RedeliveryTracker.class) - ); - }); + persistentDispatcher.readEntriesComplete(redeliverEntries, + PersistentDispatcherMultipleConsumers.ReadType.Replay); + + verify(consumerMock, times(1)).sendMessages( + argThat(arg -> { + assertEquals(arg.size(), 1); + Entry entry = arg.get(0); + assertEquals(entry.getLedgerId(), 1); + assertEquals(entry.getEntryId(), 3); + return true; + }), + any(EntryBatchSizes.class), + any(EntryBatchIndexesAcks.class), + anyInt(), + anyLong(), + anyLong(), + any(RedeliveryTracker.class) + ); verify(slowConsumerMock, times(0)).sendMessages( anyList(), any(EntryBatchSizes.class), @@ -421,7 +425,7 @@ public void testMessageRedelivery() throws Exception { final List readEntries = new ArrayList<>(); readEntries.add(allEntries.get(2)); // message3 - final Consumer consumer1 = mock(Consumer.class); + final Consumer consumer1 = createMockConsumer(); doReturn("consumer1").when(consumer1).consumerName(); // Change availablePermits of consumer1 to 0 and then back to normal when(consumer1.getAvailablePermits()).thenReturn(0).thenReturn(10); @@ -437,7 +441,7 @@ public void testMessageRedelivery() throws Exception { }).when(consumer1).sendMessages(anyList(), any(EntryBatchSizes.class), any(EntryBatchIndexesAcks.class), anyInt(), anyLong(), anyLong(), any(RedeliveryTracker.class)); - final Consumer consumer2 = mock(Consumer.class); + final Consumer consumer2 = createMockConsumer(); doReturn("consumer2").when(consumer2).consumerName(); when(consumer2.getAvailablePermits()).thenReturn(10); doReturn(true).when(consumer2).isWritable(); @@ -619,7 +623,7 @@ public void testLastSentPositionAndIndividuallySentPositions(final boolean initi PositionFactory.create(1, -1), PositionFactory.create(3, 19))), 60); // Add a consumer - final Consumer consumer1 = mock(Consumer.class); + final Consumer consumer1 = createMockConsumer(); doReturn("consumer1").when(consumer1).consumerName(); when(consumer1.getAvailablePermits()).thenReturn(1000); doReturn(true).when(consumer1).isWritable(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java index e8fd537831673..ddf7b0f1d5ee2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java @@ -18,6 +18,9 @@ */ package org.apache.pulsar.client.api; +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.pulsar.broker.BrokerTestUtil.receiveMessages; +import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.spy; @@ -32,6 +35,7 @@ import java.lang.reflect.Field; import java.nio.file.Files; import java.nio.file.Paths; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -62,6 +66,7 @@ import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.service.StickyKeyConsumerSelector; import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.nonpersistent.NonPersistentStickyKeyDispatcherMultipleConsumers; @@ -91,7 +96,7 @@ import org.testng.annotations.DataProvider; import org.testng.annotations.Test; -@Test(groups = "flaky") +@Test(groups = "broker-impl") public class KeySharedSubscriptionTest extends ProducerConsumerBase { private static final Logger log = LoggerFactory.getLogger(KeySharedSubscriptionTest.class); @@ -155,6 +160,12 @@ public void resetDefaultNamespace() throws Exception { admin.topics().delete(topicName, false); } } + // reset read ahead limits to defaults + ServiceConfiguration defaultConf = new ServiceConfiguration(); + conf.setKeySharedLookAheadMsgInReplayThresholdPerSubscription( + defaultConf.getKeySharedLookAheadMsgInReplayThresholdPerSubscription()); + conf.setKeySharedLookAheadMsgInReplayThresholdPerConsumer( + defaultConf.getKeySharedLookAheadMsgInReplayThresholdPerConsumer()); } private static final Random random = new Random(System.nanoTime()); @@ -630,8 +641,11 @@ public void testOrderingWhenAddingConsumers() throws Exception { } @Test - public void testReadAheadWhenAddingConsumers() throws Exception { - String topic = "testReadAheadWhenAddingConsumers-" + UUID.randomUUID(); + public void testReadAheadWithConfiguredLookAheadLimit() throws Exception { + String topic = "testReadAheadWithConfiguredLookAheadLimit-" + UUID.randomUUID(); + + // Set the look ahead limit to 50 for subscriptions + conf.setKeySharedLookAheadMsgInReplayThresholdPerSubscription(50); @Cleanup Producer producer = createProducer(topic, false); @@ -679,7 +693,8 @@ public void testReadAheadWhenAddingConsumers() throws Exception { // We need to ensure that dispatcher does not keep to look ahead in the topic, Position readPosition = sub.getCursor().getReadPosition(); - assertTrue(readPosition.getEntryId() < 1000); + long entryId = readPosition.getEntryId(); + assertTrue(entryId < 100); } @Test @@ -1296,7 +1311,7 @@ public void testCheckBetweenSkippingAndRecentlyJoinedConsumers(boolean preSend) redeliveryMessagesField.setAccessible(true); final MessageRedeliveryController redeliveryMessages = (MessageRedeliveryController) redeliveryMessagesField.get(dispatcher); - final Set replayMsgSet = redeliveryMessages.getMessagesToReplayNow(3); + final Set replayMsgSet = redeliveryMessages.getMessagesToReplayNow(3, item -> true); assertEquals(replayMsgSet.size(), 1); final Position replayMsg = replayMsgSet.stream().findAny().get(); assertEquals(replayMsg, PositionFactory.create(msg1Id.getLedgerId(), msg1Id.getEntryId())); @@ -2302,4 +2317,130 @@ public void testRecentJoinedPosWillNotStuckOtherConsumer(boolean allowKeySharedO producer.close(); admin.topics().delete(topic, false); } + + @Test + public void testReadAheadLimit() throws Exception { + String topic = "testReadAheadLimit-" + UUID.randomUUID(); + int numberOfKeys = 1000; + long pauseTime = 100L; + int readAheadLimit = 20; + pulsar.getConfig().setKeySharedLookAheadMsgInReplayThresholdPerSubscription(readAheadLimit); + + @Cleanup + Producer producer = createProducer(topic, false); + + // create a consumer and close it to create a subscription + String subscriptionName = "key_shared"; + pulsarClient.newConsumer(Schema.INT32) + .topic(topic) + .subscriptionName(subscriptionName) + .subscriptionType(SubscriptionType.Key_Shared) + .subscribe() + .close(); + + Topic t = pulsar.getBrokerService().getTopicIfExists(topic).get().get(); + PersistentSubscription sub = (PersistentSubscription) t.getSubscription(subscriptionName); + // get the dispatcher reference + PersistentStickyKeyDispatcherMultipleConsumers dispatcher = + (PersistentStickyKeyDispatcherMultipleConsumers) sub.getDispatcher(); + + // create a function to use for checking the number of messages in replay + Runnable checkLimit = () -> { + assertThat(dispatcher.getNumberOfMessagesInReplay()).isLessThanOrEqualTo(readAheadLimit); + }; + + // Adding a new consumer. + @Cleanup + Consumer c1 = pulsarClient.newConsumer(Schema.INT32) + .topic(topic) + .consumerName("c1") + .subscriptionName(subscriptionName) + .subscriptionType(SubscriptionType.Key_Shared) + .receiverQueueSize(10) + .startPaused(true) // start paused + .subscribe(); + + @Cleanup + Consumer c2 = pulsarClient.newConsumer(Schema.INT32) + .topic(topic) + .consumerName("c2") + .subscriptionName(subscriptionName) + .subscriptionType(SubscriptionType.Key_Shared) + .receiverQueueSize(500) // use large receiver queue size + .subscribe(); + + @Cleanup + Consumer c3 = pulsarClient.newConsumer(Schema.INT32) + .topic(topic) + .consumerName("c3") + .subscriptionName(subscriptionName) + .subscriptionType(SubscriptionType.Key_Shared) + .receiverQueueSize(10) + .startPaused(true) // start paused + .subscribe(); + + // find keys that will be assigned to c2 + List keysForC2 = new ArrayList<>(); + for (int i = 0; i < numberOfKeys; i++) { + String key = String.valueOf(i); + byte[] keyBytes = key.getBytes(UTF_8); + int hash = StickyKeyConsumerSelector.makeStickyKeyHash(keyBytes); + if (dispatcher.getSelector().select(hash).consumerName().equals("c2")) { + keysForC2.add(key); + } + } + + Set remainingMessageValues = new HashSet<>(); + // produce messages with keys that all get assigned to c2 + for (int i = 0; i < 1000; i++) { + String key = keysForC2.get(random.nextInt(keysForC2.size())); + //log.info("Producing message with key: {} value: {}", key, i); + producer.newMessage() + .key(key) + .value(i) + .send(); + remainingMessageValues.add(i); + } + + checkLimit.run(); + + Thread.sleep(pauseTime); + checkLimit.run(); + + Thread.sleep(pauseTime); + checkLimit.run(); + + // resume c1 and c3 + c1.resume(); + c3.resume(); + + Thread.sleep(pauseTime); + checkLimit.run(); + + // produce more messages + for (int i = 1000; i < 2000; i++) { + String key = String.valueOf(random.nextInt(numberOfKeys)); + producer.newMessage() + .key(key) + .value(i) + .send(); + remainingMessageValues.add(i); + checkLimit.run(); + } + + // consume the messages + receiveMessages((consumer, msg) -> { + synchronized (this) { + try { + consumer.acknowledge(msg); + } catch (PulsarClientException e) { + throw new RuntimeException(e); + } + remainingMessageValues.remove(msg.getValue()); + checkLimit.run(); + return true; + } + }, Duration.ofSeconds(2), c1, c2, c3); + assertEquals(remainingMessageValues, Collections.emptySet()); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerConsumerBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerConsumerBase.java index ef070250ca1aa..0cf2e49d35bee 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerConsumerBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerConsumerBase.java @@ -18,15 +18,15 @@ */ package org.apache.pulsar.client.api; +import static org.apache.pulsar.broker.BrokerTestUtil.receiveMessagesInThreads; import com.google.common.collect.Sets; - import java.lang.reflect.Method; +import java.time.Duration; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Random; import java.util.Set; - -import java.util.concurrent.TimeUnit; import java.util.function.BiFunction; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; @@ -78,32 +78,16 @@ protected ReceivedMessages receiveAndAckMessages( BiFunction ackPredicate, Consumer...consumers) throws Exception { ReceivedMessages receivedMessages = new ReceivedMessages(); - while (true) { - int receivedMsgCount = 0; - for (int i = 0; i < consumers.length; i++) { - Consumer consumer = consumers[i]; - while (true) { - Message msg = consumer.receive(2, TimeUnit.SECONDS); - if (msg != null) { - receivedMsgCount++; - T v = msg.getValue(); - MessageId messageId = msg.getMessageId(); - receivedMessages.messagesReceived.add(Pair.of(msg.getMessageId(), v)); - if (ackPredicate.apply(messageId, v)) { - consumer.acknowledge(msg); - receivedMessages.messagesAcked.add(Pair.of(msg.getMessageId(), v)); - } - } else { - break; - } - } + receiveMessagesInThreads((consumer, msg) -> { + T v = msg.getValue(); + MessageId messageId = msg.getMessageId(); + receivedMessages.messagesReceived.add(Pair.of(msg.getMessageId(), v)); + if (ackPredicate.apply(messageId, v)) { + consumer.acknowledgeAsync(msg); + receivedMessages.messagesAcked.add(Pair.of(msg.getMessageId(), v)); } - // Because of the possibility of consumers getting stuck with each other, only jump out of the loop if all - // consumers could not receive messages. - if (receivedMsgCount == 0) { - break; - } - } + return true; + }, Duration.ofSeconds(2), consumers); return receivedMessages; } @@ -113,9 +97,9 @@ protected ReceivedMessages ackAllMessages(Consumer...consumers) throws protected static class ReceivedMessages { - List> messagesReceived = new ArrayList<>(); + List> messagesReceived = Collections.synchronizedList(new ArrayList<>()); - List> messagesAcked = new ArrayList<>(); + List> messagesAcked = Collections.synchronizedList(new ArrayList<>()); public boolean hasReceivedMessage(T v) { for (Pair pair : messagesReceived) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/KeySharedSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/KeySharedSubscriptionTest.java index 1d534176e8d61..7889b19e5b29e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/KeySharedSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/KeySharedSubscriptionTest.java @@ -151,10 +151,12 @@ public void testCanRecoverConsumptionWhenLiftMaxUnAckedMessagesRestriction(Subsc .until(() -> (System.currentTimeMillis() - lastActiveTime.get()) > TimeUnit.SECONDS.toMillis(5)); + logTopicStats(topic); + //Determine if all messages have been received. //If the dispatcher is stuck, we can not receive enough messages. - Assert.assertEquals(pubMessages.size(), totalMsg); - Assert.assertEquals(pubMessages.size(), recMessages.size()); + Assert.assertEquals(totalMsg, pubMessages.size()); + Assert.assertEquals(recMessages.size(), pubMessages.size()); Assert.assertTrue(recMessages.containsAll(pubMessages)); // cleanup