-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Bug] Topic compaction causes subscription backlog growth #23245
Comments
This might be fixed by #22372 . The fix will be included in Pulsar 3.3.2 and Pulsar 3.0.7 . (The PR had invalid release labels, fixed the 3.0.x label now). |
Unfortunately it does not fix the issue. I created a branch from master and added this test to verify. It keeps failing. Please see this draft PR |
Thanks for verifying @marekczajkowski |
This behavior leads to infinity re-delivery of messages when batch is enabled. I modified the test a bit @Test
public void testCompactionWithEmptyValue() throws Exception {
String topic = "persistent://my-property/use/my-ns/test1";
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
.enableBatching(true)
.batchingMaxMessages(2)
.batchingMaxPublishDelay(1, TimeUnit.HOURS)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();
producer.newMessage().key("withValue").value("data".getBytes()).sendAsync();
producer.newMessage().key("emptyValue").send();
compact(topic);
try (Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscriptionType(SubscriptionType.Exclusive)
.subscriptionMode(SubscriptionMode.Durable)
.ackTimeout(1, TimeUnit.SECONDS)
.readCompacted(true).subscribe()) {
for (int i=0; i<10 ; i++) {
Message<byte[]> m = consumer.receive(2, TimeUnit.SECONDS);
if (m != null) {
System.out.println(m.getKey() + " " + m.getMessageId());
consumer.acknowledgeCumulative(m);
}
}
Awaitility.await().untilAsserted(() ->
Assert.assertEquals(admin.topics().getStats(topic).getSubscriptions().get("sub1").getMsgBacklog(), 0));
}
}
|
Search before asking
Read release policy
Version
OSX 14.3.1 (23D60), JDK 17, Pulsar 3.3.1
Minimal reproduce step
When last message sent to topic has no value and such topic is compacted then subscription that reads this compacted topic has messages in backlog. Please check the junit Test below. It fails but it should pass just fine.
What did you expect to see?
No message in backlog of the subscription that reads compacted topic.
What did you see instead?
Message backlog is not empty when subscription read compacted topic.
Anything else?
I verified this behavior on both running Pulsar and unit test. If you change second message
producer.newMessage().key("emptyValue").send()
toproducer.newMessage().key("emptyValue").value("data".getBytes()).send()
The test passes.
The behavior should be the same regardless the presence of payload in last message on the topic.
Are you willing to submit a PR?
The text was updated successfully, but these errors were encountered: