Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker] Fix race condition in unacked message updating and dispatcher blocking #192

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3338,7 +3338,7 @@ public void checkUnAckMessageDispatching() {
} else if (blockedDispatcherOnHighUnackedMsgs.get() && unAckedMessages < maxUnackedMessages / 2) {
// unblock broker-dispatching if received enough acked messages back
if (blockedDispatcherOnHighUnackedMsgs.compareAndSet(true, false)) {
unblockDispatchersOnUnAckMessages(blockedDispatchers.values());
unblockDispatchersOnUnAckMessages(blockedDispatchers.values(), true);
}
}

Expand Down Expand Up @@ -3376,13 +3376,17 @@ private void blockDispatchersWithLargeUnAckMessages() {
* Unblocks the dispatchers and removes it from the {@link #blockedDispatchers} list.
*
* @param dispatcherList
* @param allowReadMore
*/
public void unblockDispatchersOnUnAckMessages(List<PersistentDispatcherMultipleConsumers> dispatcherList) {
public void unblockDispatchersOnUnAckMessages(List<PersistentDispatcherMultipleConsumers> dispatcherList,
boolean allowReadMore) {
lock.writeLock().lock();
try {
dispatcherList.forEach(dispatcher -> {
dispatcher.unBlockDispatcherOnUnackedMsgs();
executor().execute(() -> dispatcher.readMoreEntries());
if (allowReadMore) {
executor().execute(() -> dispatcher.readMoreEntries());
}
log.info("[{}] Dispatcher is unblocked", dispatcher.getName());
blockedDispatchers.remove(dispatcher);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ public Future<Void> sendMessages(final List<? extends Entry> entries,
+ " for consumerId: {}; avgMessagesPerEntry is {}",
topicName, subscription, ackedCount, totalMessages, consumerId, avgMessagesPerEntry.get());
}
incrementUnackedMessages(unackedMessages);
addAndGetUnAckedMsgs(this, unackedMessages);
Future<Void> writeAndFlushPromise =
cnx.getCommandSender().sendMessagesToConsumer(consumerId, topicName, subscription, partitionIdx,
entries, batchSizes, batchIndexesAcks, redeliveryTracker, epoch);
Expand All @@ -412,14 +412,6 @@ public Future<Void> sendMessages(final List<? extends Entry> entries,
return writeAndFlushPromise;
}

private void incrementUnackedMessages(int unackedMessages) {
if (Subscription.isIndividualAckMode(subType)
&& addAndGetUnAckedMsgs(this, unackedMessages) >= getMaxUnackedMessages()
&& getMaxUnackedMessages() > 0) {
blockedConsumerOnUnackedMsgs = true;
}
}

public boolean isWritable() {
return cnx.isWritable();
}
Expand Down Expand Up @@ -793,10 +785,6 @@ public void flowPermits(int additionalNumberOfMessages) {
checkArgument(additionalNumberOfMessages > 0);
this.lastConsumedFlowTimestamp = System.currentTimeMillis();

// block shared consumer when unacked-messages reaches limit
if (shouldBlockConsumerOnUnackMsgs() && unackedMessages >= getMaxUnackedMessages()) {
blockedConsumerOnUnackedMsgs = true;
}
int oldPermits;
if (!blockedConsumerOnUnackedMsgs) {
oldPermits = MESSAGE_PERMITS_UPDATER.getAndAdd(this, additionalNumberOfMessages);
Expand Down Expand Up @@ -879,16 +867,6 @@ public boolean checkAndApplyTopicMigration() {
}
return false;
}
/**
* Checks if consumer-blocking on unAckedMessages is allowed for below conditions:<br/>
* a. consumer must have Shared-subscription<br/>
* b. {@link this#getMaxUnackedMessages()} value > 0
*
* @return
*/
private boolean shouldBlockConsumerOnUnackMsgs() {
return Subscription.isIndividualAckMode(subType) && getMaxUnackedMessages() > 0;
}

public void updateRates() {
msgOut.calculateRate();
Expand Down Expand Up @@ -1044,15 +1022,6 @@ private boolean removePendingAcks(PositionImpl position) {
if (log.isDebugEnabled()) {
log.debug("[{}-{}] consumer {} received ack {}", topicName, subscription, consumerId, position);
}
// unblock consumer-throttling when limit check is disabled or receives half of maxUnackedMessages =>
// consumer can start again consuming messages
int unAckedMsgs = UNACKED_MESSAGES_UPDATER.get(ackOwnedConsumer);
if ((((unAckedMsgs <= getMaxUnackedMessages() / 2) && ackOwnedConsumer.blockedConsumerOnUnackedMsgs)
&& ackOwnedConsumer.shouldBlockConsumerOnUnackMsgs())
|| !shouldBlockConsumerOnUnackMsgs()) {
ackOwnedConsumer.blockedConsumerOnUnackedMsgs = false;
flowConsumerBlockedPermits(ackOwnedConsumer);
}
return true;
}
return false;
Expand All @@ -1068,7 +1037,7 @@ public int getPriorityLevel() {

public void redeliverUnacknowledgedMessages(long consumerEpoch) {
// cleanup unackedMessage bucket and redeliver those unack-msgs again
clearUnAckedMsgs();
UNACKED_MESSAGES_UPDATER.set(this, 0);
blockedConsumerOnUnackedMsgs = false;
if (log.isDebugEnabled()) {
log.debug("[{}-{}] consumer {} received redelivery", topicName, subscription, consumerId);
Expand Down Expand Up @@ -1143,10 +1112,24 @@ public Subscription getSubscription() {
}

private int addAndGetUnAckedMsgs(Consumer consumer, int ackedMessages) {
int unackedMsgs = 0;
if (isPersistentTopic && Subscription.isIndividualAckMode(subType)) {
subscription.addUnAckedMessages(ackedMessages);
unackedMsgs = UNACKED_MESSAGES_UPDATER.addAndGet(consumer, ackedMessages);
if (!isPersistentTopic || !Subscription.isIndividualAckMode(subType)) {
return 0;
}
subscription.addUnAckedMessages(ackedMessages);
int unackedMsgs = UNACKED_MESSAGES_UPDATER.addAndGet(consumer, ackedMessages);
int maxUnackedMessages = getMaxUnackedMessages();
if (maxUnackedMessages > 0) {
if (ackedMessages < 0) {
if (unackedMsgs <= maxUnackedMessages / 2 && blockedConsumerOnUnackedMsgs) {
blockedConsumerOnUnackedMsgs = false;
flowConsumerBlockedPermits(this);
}
} else if (ackedMessages > 0) {
// block shared consumer when unacked-messages reaches limit
if (unackedMsgs >= getMaxUnackedMessages()) {
blockedConsumerOnUnackedMsgs = true;
}
}
}
if (unackedMsgs < 0 && System.currentTimeMillis() - negativeUnackedMsgsTimestamp >= 10_000) {
negativeUnackedMsgsTimestamp = System.currentTimeMillis();
Expand All @@ -1155,11 +1138,6 @@ private int addAndGetUnAckedMsgs(Consumer consumer, int ackedMessages) {
return unackedMsgs;
}

private void clearUnAckedMsgs() {
int unaAckedMsgs = UNACKED_MESSAGES_UPDATER.getAndSet(this, 0);
subscription.addUnAckedMessages(-unaAckedMsgs);
}

public boolean isPreciseDispatcherFlowControl() {
return preciseDispatcherFlowControl;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.pulsar.broker.service.persistent;

import static org.apache.pulsar.broker.service.persistent.PersistentTopic.MESSAGE_RATE_BACKOFF_MS;
import com.google.common.collect.Lists;
import com.google.common.collect.Range;
import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -1005,6 +1004,7 @@ public boolean isConsumerAvailable(Consumer consumer) {

@Override
public synchronized void redeliverUnacknowledgedMessages(Consumer consumer, long consumerEpoch) {
doAddUnAckedMessages(-totalUnackedMessages, false);
consumer.getPendingAcks().forEach((ledgerId, entryId, batchSize, stickyKeyHash) -> {
if (addMessageToReplay(ledgerId, entryId, stickyKeyHash)) {
redeliveryTracker.incrementAndGetRedeliveryCount((PositionImpl.get(ledgerId, entryId)));
Expand All @@ -1019,6 +1019,7 @@ public synchronized void redeliverUnacknowledgedMessages(Consumer consumer, long

@Override
public synchronized void redeliverUnacknowledgedMessages(Consumer consumer, List<PositionImpl> positions) {
doAddUnAckedMessages(-totalUnackedMessages, false);
positions.forEach(position -> {
// TODO: We want to pass a sticky key hash as a third argument to guarantee the order of the messages
// on Key_Shared subscription, but it's difficult to get the sticky key here
Expand All @@ -1034,12 +1035,18 @@ public synchronized void redeliverUnacknowledgedMessages(Consumer consumer, List

@Override
public void addUnAckedMessages(int numberOfMessages) {
doAddUnAckedMessages(numberOfMessages, true);
}

private void doAddUnAckedMessages(int numberOfMessages, boolean allowReadMore) {
int maxUnackedMessages = topic.getMaxUnackedMessagesOnSubscription();
// don't block dispatching if maxUnackedMessages = 0
if (maxUnackedMessages <= 0 && blockedDispatcherOnUnackedMsgs == TRUE
&& BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.compareAndSet(this, TRUE, FALSE)) {
log.info("[{}] Dispatcher is unblocked, since maxUnackedMessagesPerSubscription=0", name);
readMoreEntriesAsync();
if (allowReadMore) {
readMoreEntriesAsync();
}
}

int unAckedMessages = TOTAL_UNACKED_MESSAGES_UPDATER.addAndGet(this, numberOfMessages);
Expand All @@ -1055,14 +1062,16 @@ public void addUnAckedMessages(int numberOfMessages) {
if (totalUnackedMessages < (topic.getBrokerService().maxUnackedMsgsPerDispatcher / 2)) {
if (BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.compareAndSet(this, TRUE, FALSE)) {
// it removes dispatcher from blocked list and unblocks dispatcher by scheduling read
topic.getBrokerService().unblockDispatchersOnUnAckMessages(Lists.newArrayList(this));
topic.getBrokerService().unblockDispatchersOnUnAckMessages(List.of(this), allowReadMore);
}
}
} else if (blockedDispatcherOnUnackedMsgs == TRUE && unAckedMessages < maxUnackedMessages / 2) {
} else if (blockedDispatcherOnUnackedMsgs == TRUE && unAckedMessages <= maxUnackedMessages / 2) {
// unblock dispatcher if it acks back enough messages
if (BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.compareAndSet(this, TRUE, FALSE)) {
log.debug("[{}] Dispatcher is unblocked", name);
readMoreEntriesAsync();
if (allowReadMore) {
readMoreEntriesAsync();
}
}
}
// increment broker-level count
Expand Down
Loading
Loading