Skip to content
This repository has been archived by the owner on Jun 7, 2024. It is now read-only.

Commit

Permalink
reduce batch limit per partition (#1560)
Browse files Browse the repository at this point in the history
* reduce batch limit per partition

reducing batch limit per partition to avoid throttling healthy partitions
  • Loading branch information
adyach authored Oct 12, 2023
1 parent f34c6f6 commit f7ddc22
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -735,6 +735,7 @@ public void whenIsLookingForDeadLetterAndSendAllEventsOneByOneThenBackToNormalBa
Assert.assertEquals(10, client.getJsonBatches().get(0).getEvents().size());
Assert.assertEquals("001-0001-000000000000000009", client.getJsonBatches().get(0).getCursor().getOffset());

// receive a single event in a batch and commit it so that Nakadi sends the next batch with a single event
client.startWithAutocommit(batches -> {
// 12 because the last one is stream limit reached debug info
Assert.assertEquals(12, batches.size());
Expand Down Expand Up @@ -767,7 +768,7 @@ public void shouldSkipDeadLetterdAndConsumptionToBeContinued() throws IOExceptio
.anyMatch(event -> event.get("foo").equals("{\"foo\":\"bar10\"}"))) {
// skipp commit to introduce poison pill
cursorWithPoisonPill.set(streamBatch.getCursor());
throw new RuntimeException();
throw new RuntimeException("poison pill found");
} else {
try {
NakadiTestUtils.commitCursors(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,11 +234,6 @@ private void rememberEvent(final ConsumedEvent event) {
}

private long getMessagesAllowedToSend() {
if (failedCommitPartitions.values().stream()
.anyMatch(Partition::isLookingForDeadLetter)) {
return 1;
}

final long unconfirmed = offsets.values().stream().mapToLong(PartitionData::getUnconfirmed).sum();
final long limit = getParameters().maxUncommittedMessages - unconfirmed;
return getParameters().getMessagesAllowedToSend(limit, this.sentEvents);
Expand All @@ -263,15 +258,18 @@ private void streamToOutput() {

private void streamToOutput(final boolean streamTimeoutReached) {
final long currentTimeMillis = System.currentTimeMillis();
int messagesAllowedToSend = (int) getMessagesAllowedToSend();
final boolean wasCommitted = isEverythingCommitted();
int messagesAllowedToSend = (int) getMessagesAllowedToSend();
boolean sentSomething = false;

for (final Map.Entry<EventTypePartition, PartitionData> e : offsets.entrySet()) {
final EventTypePartition etp = e.getKey();
final PartitionData partitionData = e.getValue();
Partition partition = failedCommitPartitions.get(etp);

int messagesAllowedForPartition =
(partition != null && partition.isLookingForDeadLetter()) ? 1 : messagesAllowedToSend;
// loop sends all the events from partition, until max uncommitted reached or no more events
while (true) {
if (partition != null && partition.isLookingForDeadLetter()) {
final NakadiCursor lastDeadLetterCursor = getContext().getCursorConverter().convert(
Expand All @@ -283,14 +281,14 @@ private void streamToOutput(final boolean streamTimeoutReached) {
.map(p -> p.toLastDeadLetterOffset(null))
.toArray(Partition[]::new));
failedCommitPartitions.remove(etp);
messagesAllowedToSend = (int) getMessagesAllowedToSend(); // fixme think
partition = null;
messagesAllowedForPartition = messagesAllowedToSend;
}
}

final List<ConsumedEvent> toSend = partitionData.takeEventsToStream(
currentTimeMillis,
Math.min(getBatchLimitEvents(), messagesAllowedToSend),
Math.min(getBatchLimitEvents(), messagesAllowedForPartition),
getParameters().batchTimeoutMillis,
streamTimeoutReached);

Expand Down Expand Up @@ -329,7 +327,9 @@ private void streamToOutput(final boolean streamTimeoutReached) {
if (toSend.isEmpty()) {
break;
}

messagesAllowedToSend -= toSend.size();
messagesAllowedForPartition -= toSend.size();
}
}

Expand Down

0 comments on commit f7ddc22

Please sign in to comment.