Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] Topic compaction causes subscription backlog growth #23245

Open
2 of 3 tasks
marekczajkowski opened this issue Sep 3, 2024 · 5 comments
Open
2 of 3 tasks

[Bug] Topic compaction causes subscription backlog growth #23245

marekczajkowski opened this issue Sep 3, 2024 · 5 comments
Labels
type/bug The PR fixed a bug or issue reported a bug

Comments

@marekczajkowski
Copy link
Contributor

marekczajkowski commented Sep 3, 2024

Search before asking

  • I searched in the issues and found nothing similar.

Read release policy

  • I understand that unsupported versions don't get bug fixes. I will attempt to reproduce the issue on a supported version of Pulsar client and Pulsar broker.

Version

OSX 14.3.1 (23D60), JDK 17, Pulsar 3.3.1

Minimal reproduce step

When last message sent to topic has no value and such topic is compacted then subscription that reads this compacted topic has messages in backlog. Please check the junit Test below. It fails but it should pass just fine.

@Test
    public void testCompactionWithEmptyValue() throws Exception {
        String topic = "persistent://my-property/use/my-ns/test1";

        Producer<byte[]> producer = pulsarClient.newProducer()
            .topic(topic)
            .enableBatching(false)
            .messageRoutingMode(MessageRoutingMode.SinglePartition)
            .create();

        pulsarClient.newConsumer().topic(topic).subscriptionName("sub1").readCompacted(true).subscribe().close();

        producer.newMessage().key("withValue").value("data".getBytes()).send();
        producer.newMessage().key("emptyValue").send();

        admin.topics().triggerCompaction(topic);
        Awaitility.await().atMost(1, TimeUnit.MINUTES).untilAsserted(() -> Assert.assertEquals(admin.topics().compactionStatus(topic).status, Status.SUCCESS));

        try (Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
            .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
            .subscriptionType(SubscriptionType.Exclusive)
            .readCompacted(true).subscribe()) {
            for (int i=0; i<2 ; i++) {
                Message<byte[]> m = consumer.receive(2, TimeUnit.SECONDS);
                if (m != null) {
                    consumer.acknowledgeCumulative(m);
                }
            }

            Awaitility.await().untilAsserted(() ->
                Assert.assertEquals(admin.topics().getStats(topic).getSubscriptions().get("sub1").getMsgBacklog(), 0));
        }
    }

What did you expect to see?

No message in backlog of the subscription that reads compacted topic.

What did you see instead?

Message backlog is not empty when subscription read compacted topic.

Anything else?

I verified this behavior on both running Pulsar and unit test. If you change second message
producer.newMessage().key("emptyValue").send() to producer.newMessage().key("emptyValue").value("data".getBytes()).send()
The test passes.

The behavior should be the same regardless the presence of payload in last message on the topic.

Are you willing to submit a PR?

  • I'm willing to submit a PR!
@marekczajkowski marekczajkowski added the type/bug The PR fixed a bug or issue reported a bug label Sep 3, 2024
@lhotari
Copy link
Member

lhotari commented Sep 3, 2024

This might be fixed by #22372 . The fix will be included in Pulsar 3.3.2 and Pulsar 3.0.7 . (The PR had invalid release labels, fixed the 3.0.x label now).

@marekczajkowski
Copy link
Contributor Author

This might be fixed by #22372 . The fix will be included in Pulsar 3.3.2 and Pulsar 3.0.7 . (The PR had invalid release labels, fixed the 3.0.x label now).

Unfortunately it does not fix the issue. I created a branch from master and added this test to verify. It keeps failing. Please see this draft PR
https://github.com/apache/pulsar/pull/23247/files

@lhotari
Copy link
Member

lhotari commented Sep 4, 2024

This might be fixed by #22372 . The fix will be included in Pulsar 3.3.2 and Pulsar 3.0.7 . (The PR had invalid release labels, fixed the 3.0.x label now).

Unfortunately it does not fix the issue. I created a branch from master and added this test to verify. It keeps failing. Please see this draft PR https://github.com/apache/pulsar/pull/23247/files

Thanks for verifying @marekczajkowski

@marekczajkowski
Copy link
Contributor Author

In the above test two messages are sent (5:0, 5:1)
On compaction subscription we can see the cursor:
markDeletePosittion=5:1
readPosition=5:2
Which is fine. Compaction read both messages and moved cursor to next upcoming message. It also created a compacted ledger and put there not empty message (5:0), message 5:1 has no payload so it is not present on compacted ledger.

On "sub1" we can see:
markDeletePosition=5:0
readPosition=5:2
markDeletePosition is correct as it marks the message with payload that has been read. There are no other messages on this ledger.
readPosition is calculated as the next message after lastCompactedEntry (5:1) and set to 5:2. I dont know if that's ok. It looks fine as next upcoming message on topic would be actually 5:2 but on compacted ledger we have a backlog that shows this 5:1 as difference between readPosition and markDeletePosition.

The outcome is wrong we have a backlog on a subscription that read compacted topic which is not true. There are no unacked messages there. On the other hand readPosition on sub1 probably should not be set to 5:1 as this message is gone.
Zrzut ekranu 2024-09-4 o 13 36 38

@marekczajkowski
Copy link
Contributor Author

This behavior leads to infinity re-delivery of messages when batch is enabled.

I modified the test a bit

 @Test
    public void testCompactionWithEmptyValue() throws Exception {
        String topic = "persistent://my-property/use/my-ns/test1";

        Producer<byte[]> producer = pulsarClient.newProducer()
            .topic(topic)
            .enableBatching(true)
            .batchingMaxMessages(2)
            .batchingMaxPublishDelay(1, TimeUnit.HOURS)
            .messageRoutingMode(MessageRoutingMode.SinglePartition)
            .create();

        producer.newMessage().key("withValue").value("data".getBytes()).sendAsync();
        producer.newMessage().key("emptyValue").send();

        compact(topic);

        try (Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
            .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
            .subscriptionType(SubscriptionType.Exclusive)
            .subscriptionMode(SubscriptionMode.Durable)
            .ackTimeout(1, TimeUnit.SECONDS)
            .readCompacted(true).subscribe()) {
            for (int i=0; i<10 ; i++) {
                Message<byte[]> m = consumer.receive(2, TimeUnit.SECONDS);
                if (m != null) {
                    System.out.println(m.getKey() + " " + m.getMessageId());
                    consumer.acknowledgeCumulative(m);
                }
            }

            Awaitility.await().untilAsserted(() ->
                Assert.assertEquals(admin.topics().getStats(topic).getSubscriptions().get("sub1").getMsgBacklog(), 0));
        }
    }
2024-09-05T16:28:10,805 - INFO  - [bookkeeper-ml-scheduler-OrderedScheduler-2-0:ServerCnx] - [/127.0.0.1:49307] Created subscription on topic persistent://my-property/use/my-ns/test1 / sub1
2024-09-05T16:28:10,805 - INFO  - [pulsar-client-io-36-4:ConsumerImpl] - [persistent://my-property/use/my-ns/test1][sub1] Subscribed to topic on localhost/127.0.0.1:49189 -- consumer: 1
withValue 5:0:-1:0
2024-09-05T16:28:10,901 - INFO  - [PulsarTestContext-executor-OrderedExecutor-0-0:PulsarMockBookKeeper] - Creating ledger 8
2024-09-05T16:28:10,904 - INFO  - [bookkeeper-ml-scheduler-OrderedScheduler-2-0:ManagedCursorImpl] - [my-property/use/my-ns/persistent/test1] Updated cursor sub1 with ledger id 8 md-position=5:-1 rd-position=5:1
2024-09-05T16:28:12,799 - INFO  - [pulsar-timer-69-1:UnAckedMessageTracker] - [ConsumerBase{subscription='sub1', consumerName='62d31', topic='persistent://my-property/use/my-ns/test1'}] 1 messages will be re-delivered
2024-09-05T16:28:12,805 - INFO  - [broker-topic-workers-OrderedExecutor-5-0:ManagedCursorImpl] - [my-property/use/my-ns/persistent/test1-sub1] Rewind from 5:1 to 5:0
withValue 5:0:-1:0
2024-09-05T16:28:14,802 - INFO  - [pulsar-timer-69-1:UnAckedMessageTracker] - [ConsumerBase{subscription='sub1', consumerName='62d31', topic='persistent://my-property/use/my-ns/test1'}] 1 messages will be re-delivered
2024-09-05T16:28:14,803 - INFO  - [broker-topic-workers-OrderedExecutor-5-0:ManagedCursorImpl] - [my-property/use/my-ns/persistent/test1-sub1] Rewind from 5:1 to 5:0
withValue 5:0:-1:0
2024-09-05T16:28:16,804 - INFO  - [pulsar-timer-69-1:UnAckedMessageTracker] - [ConsumerBase{subscription='sub1', consumerName='62d31', topic='persistent://my-property/use/my-ns/test1'}] 1 messages will be re-delivered
2024-09-05T16:28:16,807 - INFO  - [broker-topic-workers-OrderedExecutor-5-0:ManagedCursorImpl] - [my-property/use/my-ns/persistent/test1-sub1] Rewind from 5:1 to 5:0
withValue 5:0:-1:0
2024-09-05T16:28:18,809 - INFO  - [pulsar-timer-69-1:UnAckedMessageTracker] - [ConsumerBase{subscription='sub1', consumerName='62d31', topic='persistent://my-property/use/my-ns/test1'}] 1 messages will be re-delivered
2024-09-05T16:28:18,812 - INFO  - [broker-topic-workers-OrderedExecutor-5-0:ManagedCursorImpl] - [my-property/use/my-ns/persistent/test1-sub1] Rewind from 5:1 to 5:0
withValue 5:0:-1:0
2024-09-05T16:28:20,812 - INFO  - [pulsar-timer-69-1:UnAckedMessageTracker] - [ConsumerBase{subscription='sub1', consumerName='62d31', topic='persistent://my-property/use/my-ns/test1'}] 1 messages will be re-delivered
2024-09-05T16:28:20,813 - INFO  - [broker-topic-workers-OrderedExecutor-5-0:ManagedCursorImpl] - [my-property/use/my-ns/persistent/test1-sub1] Rewind from 5:1 to 5:0

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/bug The PR fixed a bug or issue reported a bug
Projects
None yet
Development

No branches or pull requests

2 participants