Skip to content

Commit

Permalink
[fix][broker] Fix retry backoff for PersistentDispatcherMultipleConsu…
Browse files Browse the repository at this point in the history
…mers (apache#23284)
  • Loading branch information
lhotari authored and michalcukierman committed Sep 11, 2024
1 parent 4592a08 commit f8badce
Show file tree
Hide file tree
Showing 2 changed files with 129 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -451,12 +451,15 @@ protected void reScheduleReadInMs(long readAfterMs) {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Reschedule message read in {} ms", topic.getName(), name, readAfterMs);
}
topic.getBrokerService().executor().schedule(
() -> {
isRescheduleReadInProgress.set(false);
readMoreEntries();
},
readAfterMs, TimeUnit.MILLISECONDS);
Runnable runnable = () -> {
isRescheduleReadInProgress.set(false);
readMoreEntries();
};
if (readAfterMs > 0) {
topic.getBrokerService().executor().schedule(runnable, readAfterMs, TimeUnit.MILLISECONDS);
} else {
topic.getBrokerService().executor().execute(runnable);
}
}
}

Expand Down Expand Up @@ -836,6 +839,7 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis
totalBytesSent += sendMessageInfo.getTotalBytes();
}

lastNumberOfEntriesDispatched = (int) totalEntries;
acquirePermitsForDeliveredMessages(topic, cursor, totalEntries, totalMessagesSent, totalBytesSent);

if (entriesToDispatch > 0) {
Expand All @@ -848,9 +852,8 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis
addMessageToReplay(entry.getLedgerId(), entry.getEntryId(), stickyKeyHash);
entry.release();
});

lastNumberOfEntriesDispatched = entriesToDispatch;
}

return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ public class PersistentStickyKeyDispatcherMultipleConsumersTest {
final String topicName = "persistent://public/default/testTopic";
final String subscriptionName = "testSubscription";
private AtomicInteger consumerMockAvailablePermits;
int retryBackoffInitialTimeInMs = 10;
int retryBackoffMaxTimeInMs = 50;

@BeforeMethod
public void setup() throws Exception {
Expand All @@ -120,8 +122,8 @@ public void setup() throws Exception {
doReturn(1).when(configMock).getSubscriptionKeySharedConsistentHashingReplicaPoints();
doReturn(true).when(configMock).isDispatcherDispatchMessagesInSubscriptionThread();
doReturn(false).when(configMock).isAllowOverrideEntryFilters();
doReturn(10).when(configMock).getDispatcherRetryBackoffInitialTimeInMs();
doReturn(50).when(configMock).getDispatcherRetryBackoffMaxTimeInMs();
doAnswer(invocation -> retryBackoffInitialTimeInMs).when(configMock).getDispatcherRetryBackoffInitialTimeInMs();
doAnswer(invocation -> retryBackoffMaxTimeInMs).when(configMock).getDispatcherRetryBackoffMaxTimeInMs();
pulsarMock = mock(PulsarService.class);
doReturn(configMock).when(pulsarMock).getConfiguration();

Expand Down Expand Up @@ -825,42 +827,53 @@ public void testLastSentPositionAndIndividuallySentPositions(final boolean initi
assertEquals(persistentDispatcher.getLastSentPosition(), initialLastSentPosition.toString());
}

@DataProvider(name = "dispatchMessagesInSubscriptionThread")
private Object[][] dispatchMessagesInSubscriptionThread() {
return new Object[][] { { false }, { true } };
@DataProvider(name = "testBackoffDelayWhenNoMessagesDispatched")
private Object[][] testBackoffDelayWhenNoMessagesDispatchedParams() {
return new Object[][] { { false, true }, { true, true }, { true, false }, { false, false } };
}

@Test(dataProvider = "dispatchMessagesInSubscriptionThread")
public void testBackoffDelayWhenNoMessagesDispatched(boolean dispatchMessagesInSubscriptionThread)
@Test(dataProvider = "testBackoffDelayWhenNoMessagesDispatched")
public void testBackoffDelayWhenNoMessagesDispatched(boolean dispatchMessagesInSubscriptionThread, boolean isKeyShared)
throws Exception {
persistentDispatcher.close();

List<Long> retryDelays = new CopyOnWriteArrayList<>();
doReturn(dispatchMessagesInSubscriptionThread).when(configMock).isDispatcherDispatchMessagesInSubscriptionThread();
persistentDispatcher = new PersistentStickyKeyDispatcherMultipleConsumers(
topicMock, cursorMock, subscriptionMock, configMock,
new KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT)) {
@Override
protected void reScheduleReadInMs(long readAfterMs) {
retryDelays.add(readAfterMs);
}
};

PersistentDispatcherMultipleConsumers dispatcher;
if (isKeyShared) {
dispatcher = new PersistentStickyKeyDispatcherMultipleConsumers(
topicMock, cursorMock, subscriptionMock, configMock,
new KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT)) {
@Override
protected void reScheduleReadInMs(long readAfterMs) {
retryDelays.add(readAfterMs);
}
};
} else {
dispatcher = new PersistentDispatcherMultipleConsumers(topicMock, cursorMock, subscriptionMock) {
@Override
protected void reScheduleReadInMs(long readAfterMs) {
retryDelays.add(readAfterMs);
}
};
}

// add a consumer without permits to trigger the retry behavior
consumerMockAvailablePermits.set(0);
persistentDispatcher.addConsumer(consumerMock);
dispatcher.addConsumer(consumerMock);

// call "readEntriesComplete" directly to test the retry behavior
List<Entry> entries = List.of(EntryImpl.create(1, 1, createMessage("message1", 1)));
persistentDispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal);
dispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal);
Awaitility.await().untilAsserted(() -> {
assertEquals(retryDelays.size(), 1);
assertEquals(retryDelays.get(0), 10, "Initial retry delay should be 10ms");
}
);
// test the second retry delay
entries = List.of(EntryImpl.create(1, 1, createMessage("message1", 1)));
persistentDispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal);
dispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal);
Awaitility.await().untilAsserted(() -> {
assertEquals(retryDelays.size(), 2);
double delay = retryDelays.get(1);
Expand All @@ -870,7 +883,7 @@ protected void reScheduleReadInMs(long readAfterMs) {
// verify the max retry delay
for (int i = 0; i < 100; i++) {
entries = List.of(EntryImpl.create(1, 1, createMessage("message1", 1)));
persistentDispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal);
dispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal);
}
Awaitility.await().untilAsserted(() -> {
assertEquals(retryDelays.size(), 102);
Expand All @@ -881,21 +894,104 @@ protected void reScheduleReadInMs(long readAfterMs) {
// unblock to check that the retry delay is reset
consumerMockAvailablePermits.set(1000);
entries = List.of(EntryImpl.create(1, 2, createMessage("message2", 1, "key2")));
persistentDispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal);
dispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal);
// wait that the possibly async handling has completed
Awaitility.await().untilAsserted(() -> assertFalse(persistentDispatcher.isSendInProgress()));
Awaitility.await().untilAsserted(() -> assertFalse(dispatcher.isSendInProgress()));

// now block again to check the next retry delay so verify it was reset
consumerMockAvailablePermits.set(0);
entries = List.of(EntryImpl.create(1, 3, createMessage("message3", 1, "key3")));
persistentDispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal);
dispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal);
Awaitility.await().untilAsserted(() -> {
assertEquals(retryDelays.size(), 103);
assertEquals(retryDelays.get(0), 10, "Resetted retry delay should be 10ms");
}
);
}

@Test(dataProvider = "testBackoffDelayWhenNoMessagesDispatched")
public void testBackoffDelayWhenRetryDelayDisabled(boolean dispatchMessagesInSubscriptionThread, boolean isKeyShared)
throws Exception {
persistentDispatcher.close();

// it should be possible to disable the retry delay
// by setting retryBackoffInitialTimeInMs and retryBackoffMaxTimeInMs to 0
retryBackoffInitialTimeInMs=0;
retryBackoffMaxTimeInMs=0;

List<Long> retryDelays = new CopyOnWriteArrayList<>();
doReturn(dispatchMessagesInSubscriptionThread).when(configMock)
.isDispatcherDispatchMessagesInSubscriptionThread();

PersistentDispatcherMultipleConsumers dispatcher;
if (isKeyShared) {
dispatcher = new PersistentStickyKeyDispatcherMultipleConsumers(
topicMock, cursorMock, subscriptionMock, configMock,
new KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT)) {
@Override
protected void reScheduleReadInMs(long readAfterMs) {
retryDelays.add(readAfterMs);
}
};
} else {
dispatcher = new PersistentDispatcherMultipleConsumers(topicMock, cursorMock, subscriptionMock) {
@Override
protected void reScheduleReadInMs(long readAfterMs) {
retryDelays.add(readAfterMs);
}
};
}

// add a consumer without permits to trigger the retry behavior
consumerMockAvailablePermits.set(0);
dispatcher.addConsumer(consumerMock);

// call "readEntriesComplete" directly to test the retry behavior
List<Entry> entries = List.of(EntryImpl.create(1, 1, createMessage("message1", 1)));
dispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal);
Awaitility.await().untilAsserted(() -> {
assertEquals(retryDelays.size(), 1);
assertEquals(retryDelays.get(0), 0, "Initial retry delay should be 0ms");
}
);
// test the second retry delay
entries = List.of(EntryImpl.create(1, 1, createMessage("message1", 1)));
dispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal);
Awaitility.await().untilAsserted(() -> {
assertEquals(retryDelays.size(), 2);
double delay = retryDelays.get(1);
assertEquals(delay, 0, 0, "Second retry delay should be 0ms");
}
);
// verify the max retry delay
for (int i = 0; i < 100; i++) {
entries = List.of(EntryImpl.create(1, 1, createMessage("message1", 1)));
dispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal);
}
Awaitility.await().untilAsserted(() -> {
assertEquals(retryDelays.size(), 102);
double delay = retryDelays.get(101);
assertEquals(delay, 0, 0, "Max delay should be 0ms");
}
);
// unblock to check that the retry delay is reset
consumerMockAvailablePermits.set(1000);
entries = List.of(EntryImpl.create(1, 2, createMessage("message2", 1, "key2")));
dispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal);
// wait that the possibly async handling has completed
Awaitility.await().untilAsserted(() -> assertFalse(dispatcher.isSendInProgress()));

// now block again to check the next retry delay so verify it was reset
consumerMockAvailablePermits.set(0);
entries = List.of(EntryImpl.create(1, 3, createMessage("message3", 1, "key3")));
dispatcher.readEntriesComplete(entries, PersistentDispatcherMultipleConsumers.ReadType.Normal);
Awaitility.await().untilAsserted(() -> {
assertEquals(retryDelays.size(), 103);
assertEquals(retryDelays.get(0), 0, "Resetted retry delay should be 0ms");
}
);
}

private ByteBuf createMessage(String message, int sequenceId) {
return createMessage(message, sequenceId, "testKey");
}
Expand Down

0 comments on commit f8badce

Please sign in to comment.