Skip to content

Commit 8a198ee

Browse files
committed
Make dynamic batch pump more aggressively
Low value for maxUnconfirmedMessages combined with unfortunate timing can make the dynamic batch flush only on timeout. This commit makes the dynamic batch class "pump" for new items (messages) more aggressively, which mitigates the problem. References #750
1 parent 05df2a5 commit 8a198ee

File tree

2 files changed

+53
-9
lines changed

2 files changed

+53
-9
lines changed

src/main/java/com/rabbitmq/stream/impl/DynamicBatch.java

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -69,22 +69,30 @@ private void loop() {
6969
if (state.items.size() >= state.batchSize) {
7070
this.maybeCompleteBatch(state, true);
7171
} else {
72-
item = this.requests.poll();
73-
if (item == null) {
74-
this.maybeCompleteBatch(state, false);
75-
} else {
76-
state.items.add(item);
77-
if (state.items.size() >= state.batchSize) {
78-
this.maybeCompleteBatch(state, true);
79-
}
80-
}
72+
pump(state, 2);
8173
}
8274
} else {
8375
this.maybeCompleteBatch(state, false);
8476
}
8577
}
8678
}
8779

80+
private void pump(State<T> state, int pumpCount) {
81+
if (pumpCount <= 0) {
82+
return;
83+
}
84+
T item = this.requests.poll();
85+
if (item == null) {
86+
this.maybeCompleteBatch(state, false);
87+
} else {
88+
state.items.add(item);
89+
if (state.items.size() >= state.batchSize) {
90+
this.maybeCompleteBatch(state, true);
91+
}
92+
this.pump(state, pumpCount - 1);
93+
}
94+
}
95+
8896
private static final class State<T> {
8997

9098
int batchSize;

src/test/java/com/rabbitmq/stream/impl/DynamicBatchTest.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,11 @@
2323
import com.rabbitmq.stream.impl.TestUtils.Sync;
2424
import java.util.Locale;
2525
import java.util.Random;
26+
import java.util.concurrent.Semaphore;
27+
import java.util.concurrent.TimeUnit;
2628
import java.util.concurrent.atomic.AtomicBoolean;
2729
import java.util.concurrent.atomic.AtomicInteger;
30+
import java.util.concurrent.atomic.AtomicLong;
2831
import java.util.stream.IntStream;
2932
import org.junit.jupiter.api.Test;
3033

@@ -118,4 +121,37 @@ void failedProcessingIsReplayed() throws Exception {
118121
waitAtMost(() -> collected.get() == itemCount);
119122
}
120123
}
124+
125+
@Test
126+
void lowThrottlingValueShouldStillHighPublishingRate() throws Exception {
127+
int batchSize = 10;
128+
Semaphore semaphore = new Semaphore(batchSize);
129+
DynamicBatch.BatchConsumer<Long> action =
130+
items -> {
131+
semaphore.release(items.size());
132+
return true;
133+
};
134+
135+
try (DynamicBatch<Long> batch = new DynamicBatch<>(action, batchSize)) {
136+
MetricRegistry metrics = new MetricRegistry();
137+
Meter rate = metrics.meter("publishing-rate");
138+
AtomicBoolean keepGoing = new AtomicBoolean(true);
139+
AtomicLong sequence = new AtomicLong();
140+
new Thread(
141+
() -> {
142+
while (keepGoing.get() && !Thread.interrupted()) {
143+
long id = sequence.getAndIncrement();
144+
if (semaphore.tryAcquire()) {
145+
batch.add(id);
146+
rate.mark();
147+
}
148+
}
149+
})
150+
.start();
151+
long start = System.nanoTime();
152+
waitAtMost(
153+
() ->
154+
System.nanoTime() - start > TimeUnit.SECONDS.toNanos(1) && rate.getMeanRate() > 1000);
155+
}
156+
}
121157
}

0 commit comments

Comments
 (0)