Skip to content

Commit

Permalink
[improve][broker] Reschedule reads with increasing delay when no mess…
Browse files Browse the repository at this point in the history
…ages are dispatched
  • Loading branch information
lhotari committed Aug 26, 2024
1 parent a6029ad commit 8068b31
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@
*/
public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMultipleConsumers
implements Dispatcher, ReadEntriesCallback {
// rescheduling a read after no entries are dispatches will be delayed by this duration using a backoff
private static final int RESCHEDULE_READ_INITIAL_DELAY_MS = 100;
// maximum delay for rescheduling a read after no entries are dispatched
private static final int RESCHEDULE_READ_INITIAL_MAX_DELAY_MS = 5000;

protected final PersistentTopic topic;
protected final ManagedCursor cursor;
Expand Down Expand Up @@ -134,7 +138,10 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
private AtomicBoolean isRescheduleReadInProgress = new AtomicBoolean(false);
protected final ExecutorService dispatchMessagesThread;
private final SharedConsumerAssignor assignor;

protected int lastNumberOfEntriesDispatched;
private final Backoff rescheduleReadBackoff = new Backoff(RESCHEDULE_READ_INITIAL_DELAY_MS, TimeUnit.MILLISECONDS,
RESCHEDULE_READ_INITIAL_MAX_DELAY_MS, TimeUnit.MILLISECONDS, 0,
TimeUnit.MILLISECONDS);

protected enum ReadType {
Normal, Replay
Expand Down Expand Up @@ -438,16 +445,20 @@ private boolean shouldPauseOnAckStatePersist(ReadType readType) {

@Override
protected void reScheduleRead() {
reScheduleReadInMs(MESSAGE_RATE_BACKOFF_MS);
}

protected void reScheduleReadInMs(long readAfterMs) {
if (isRescheduleReadInProgress.compareAndSet(false, true)) {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Reschedule message read in {} ms", topic.getName(), name, MESSAGE_RATE_BACKOFF_MS);
log.debug("[{}] [{}] Reschedule message read in {} ms", topic.getName(), name, readAfterMs);
}
topic.getBrokerService().executor().schedule(
() -> {
isRescheduleReadInProgress.set(false);
readMoreEntries();
},
MESSAGE_RATE_BACKOFF_MS, TimeUnit.MILLISECONDS);
readAfterMs, TimeUnit.MILLISECONDS);
}
}

Expand Down Expand Up @@ -660,8 +671,8 @@ public final synchronized void readEntriesComplete(List<Entry> entries, Object c
log.debug("[{}] Distributing {} messages to {} consumers", name, entries.size(), consumerList.size());
}

long size = entries.stream().mapToLong(Entry::getLength).sum();
updatePendingBytesToDispatch(size);
long totalBytesSize = entries.stream().mapToLong(Entry::getLength).sum();
updatePendingBytesToDispatch(totalBytesSize);

// dispatch messages to a separate thread, but still in order for this subscription
// sendMessagesToConsumers is responsible for running broker-side filters
Expand All @@ -671,19 +682,28 @@ public final synchronized void readEntriesComplete(List<Entry> entries, Object c
// in a separate thread, and we want to prevent more reads
acquireSendInProgress();
dispatchMessagesThread.execute(() -> {
if (sendMessagesToConsumers(readType, entries, false)) {
updatePendingBytesToDispatch(-size);
readMoreEntries();
} else {
updatePendingBytesToDispatch(-size);
}
handleSendingMessagesAndReadingMore(readType, entries, false, totalBytesSize);
});
} else {
if (sendMessagesToConsumers(readType, entries, true)) {
updatePendingBytesToDispatch(-size);
readMoreEntriesAsync();
} else {
updatePendingBytesToDispatch(-size);
handleSendingMessagesAndReadingMore(readType, entries, true, totalBytesSize);
}
}

private synchronized void handleSendingMessagesAndReadingMore(ReadType readType, List<Entry> entries,
boolean needAcquireSendInProgress,
long totalBytesSize) {
boolean triggerReadingMore = sendMessagesToConsumers(readType, entries, needAcquireSendInProgress);
int entriesDispatched = lastNumberOfEntriesDispatched;
updatePendingBytesToDispatch(-totalBytesSize);
if (triggerReadingMore) {
if (entriesDispatched > 0) {
// Reset the backoff when we successfully dispatched messages
rescheduleReadBackoff.reset();
// Call readMoreEntries in the same thread to trigger the next read
readMoreEntries();
} else if (entriesDispatched == 0) {
// If no messages were dispatched, we need to reschedule a new read with an increasing backoff delay
reScheduleReadInMs(rescheduleReadBackoff.next());
}
}
}
Expand Down Expand Up @@ -722,6 +742,7 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis
if (needTrimAckedMessages()) {
cursor.trimDeletedEntries(entries);
}
lastNumberOfEntriesDispatched = 0;

int entriesToDispatch = entries.size();
// Trigger read more messages
Expand Down Expand Up @@ -829,6 +850,8 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis
addMessageToReplay(entry.getLedgerId(), entry.getEntryId(), stickyKeyHash);
entry.release();
});

lastNumberOfEntriesDispatched = entriesToDispatch;
}
return true;
}
Expand Down Expand Up @@ -891,6 +914,7 @@ private boolean sendChunkedMessagesToConsumers(ReadType readType,
totalBytesSent += sendMessageInfo.getTotalBytes();
}

lastNumberOfEntriesDispatched = (int) totalEntries;
acquirePermitsForDeliveredMessages(topic, cursor, totalEntries, totalMessagesSent, totalBytesSent);

return numConsumers.get() == 0; // trigger a new readMoreEntries() call
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ protected Map<Consumer, List<Position>> initialValue() throws Exception {

@Override
protected synchronized boolean trySendMessagesToConsumers(ReadType readType, List<Entry> entries) {
lastNumberOfEntriesDispatched = 0;
long totalMessagesSent = 0;
long totalBytesSent = 0;
long totalEntries = 0;
Expand Down Expand Up @@ -420,6 +421,8 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis
}
}

lastNumberOfEntriesDispatched = (int) totalEntries;

// acquire message-dispatch permits for already delivered messages
acquirePermitsForDeliveredMessages(topic, cursor, totalEntries, totalMessagesSent, totalBytesSent);

Expand Down

0 comments on commit 8068b31

Please sign in to comment.