Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] Non durable subscription backlog is wrong after topic unload #23239

Open
2 of 3 tasks
michalcukierman opened this issue Aug 30, 2024 · 7 comments · May be fixed by #23305
Open
2 of 3 tasks

[Bug] Non durable subscription backlog is wrong after topic unload #23239

michalcukierman opened this issue Aug 30, 2024 · 7 comments · May be fixed by #23305
Labels
type/bug The PR fixed a bug or issue reported a bug

Comments

@michalcukierman
Copy link

michalcukierman commented Aug 30, 2024

Search before asking

  • I searched in the issues and found nothing similar.

Read release policy

  • I understand that unsupported versions don't get bug fixes. I will attempt to reproduce the issue on a supported version of Pulsar client and Pulsar broker.

Version

3.3.1
3.0.5

Minimal reproduce step

  1. Execute the following java program:
package org.example;

import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionMode;

public class Main {

  public static void main(String[] args) throws PulsarClientException, InterruptedException {
    AtomicBoolean running = new AtomicBoolean(true);

    String topic = "tbce0";

    PulsarClient client = PulsarClient.builder()
        .serviceUrl("pulsar://localhost:61723")
        .build();

    Producer<byte[]> producer = client.newProducer()
        .topic(topic)
        .create();

    String message = "Hello, Pulsar!";
    for (int i = 0; i < 1000; i++) {
      producer.send(message.getBytes());
    }
    System.out.println("Sent messages: " + message);
    producer.close();

    Thread.ofVirtual().start(() -> {
      try {
        Consumer<byte[]> consumer = client.newConsumer()
            .topic(topic)
            .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
            .subscriptionName("bce-subscription-0")
            .subscriptionMode(SubscriptionMode.NonDurable)
            //.readCompacted(true)
            .subscribe();

        int counter = 1;
        while (running.get()) {
          Message<byte[]> receivedMessage = consumer.receive();
          System.out.println(
              "Received " + counter++ + " message: " + new String(receivedMessage.getData()));
          consumer.acknowledge(receivedMessage);
        }
        consumer.close();
      } catch (Exception e) {
        throw new RuntimeException(e);
      }
    });

    Runtime.getRuntime().addShutdownHook(new Thread(() -> {
      System.out.println("Exiting...");
      running.set(false);
      try {
        client.close();
      } catch (PulsarClientException e) {
        throw new RuntimeException(e);
      }
    }));

    Thread.sleep(600_000);
  }
}
  1. Wait for the results:
...
Received 993 message: Hello, Pulsar!
Received 994 message: Hello, Pulsar!
Received 995 message: Hello, Pulsar!
Received 996 message: Hello, Pulsar!
Received 997 message: Hello, Pulsar!
Received 998 message: Hello, Pulsar!
Received 999 message: Hello, Pulsar!
Received 1000 message: Hello, Pulsar!

  1. Run the following script (if on Kubernetes):
TOPIC=tbce0
kubectl exec -n kaap pulsar-broker-0 -- ./bin/pulsar-admin topics stats $TOPIC | grep msgBacklog
kubectl exec -n kaap pulsar-broker-0 -- ./bin/pulsar-admin topics unload $TOPIC
kubectl exec -n kaap pulsar-broker-0 -- ./bin/pulsar-admin topics stats $TOPIC | grep msgBacklog

What did you expect to see?

The cursor should be set to the end of the topic, backlog should be 0.

Before unloading
      "msgBacklog" : 0,
      "msgBacklogNoDelayed" : 0,
After unloading
      "msgBacklog" : 0,
      "msgBacklogNoDelayed" : 0,

What did you see instead?

Backlog is not empty:

Before unloading
"msgBacklog" : 0,
"msgBacklogNoDelayed" : 0,
After unloading
"msgBacklog" : 1,
"msgBacklogNoDelayed" : 1,

Anything else?

  1. You may want to increase the messages retention to not loose it between check, nevertheless it's not mandatory to reproduce
  2. Unloading is just the easiest way to reproduce, broker restart may cause the same

Are you willing to submit a PR?

  • I'm willing to submit a PR!
@michalcukierman michalcukierman added the type/bug The PR fixed a bug or issue reported a bug label Aug 30, 2024
@michalcukierman
Copy link
Author

Setting:

.subscriptionMode(SubscriptionMode.Durable)

Fixes the problem. So the problem may be with the rewinding the NonDurableCursor, or a backlog calculation (if the cursor is set to the next, non-existing message, whats allowed).

@summeriiii
Copy link

summeriiii commented Sep 3, 2024

@michalcukierman

hi, I reproduce this case follow your step, and I found this case might within expectation.

When use ./bin/pulsar-admin topics unload $TOPIC command to unload the topic, the NonDurable subscription will be cleaned. So when the consumer reconnected to the broker, the broker will create a new non-durable cursor, the code detail is in PersistentTopic#getNonDurableSubscription.

As shown in the screenshot, the entryId will be reduced by 1, which results in topic stats msgBacklog being 1 instead of 0.
image (2)

@michalcukierman
Copy link
Author

With this code, shouldn't the last message be re-delivered to the client after the subscription is created?
With the code I've provided the backlog "hangs" and no message is delivered to the consumer.

So now I see two issues:

  • the backlog is incorrect
  • the message from the backlog is not re-delivered to the customer, even if there is a code that rewinds the position one entry backward.

Have in mind that there may be implications of the incorrect backlog:

  • some services may rely on the backlog to scale up/down (i.e. Keda)
  • alerting rules may be fired if the backlog is not empty for a long time
  • people may get confused and spend a lot of time looking for errors

@michalcukierman
Copy link
Author

I've added a PR with the failing test. The summary is at the bottom:

        // It's OK to have a backlog, to re-consume a batch message
        assertEquals(sub.get().getMsgBacklog(), 1);

        // However the client should be able to receive the message
        Message<byte[]> msg = consumer.receive(2, TimeUnit.SECONDS);
        consumer.acknowledge(msg);

        // And the backlog should become empty
        Awaitility.await().until(
            () -> admin.topics().getStats(topic).getSubscriptions().get(subName)
                .getMsgBacklog() == 0);

@summeriiii
Copy link

summeriiii commented Sep 11, 2024

@michalcukierman
I opend the debug log, and found the message will be ignored, so the consumer won't receive the message.
[pulsar-client-io-36-5:ConsumerImpl] - [my-sub] [d0201] Ignoring message from before the startMessageId: 3:999:-1:-1

ConsumerImpl#messageReceived

            if (this.topicName.isPersistent() && isSameEntry(msgId) && isPriorEntryIndex(messageId.getEntryId())) {
                // We need to discard entries that were prior to startMessageId
                if (log.isDebugEnabled()) {
                    log.debug("[{}] [{}] Ignoring message from before the startMessageId: {}", subscription,
                            consumerName, startMessageId);
                }
                uncompressedPayload.release();
                return;
            }

But the MsgBacklog is always not correct. For NonDurable subscription, we don't save the cursor info, I still don't know how to solve this problem.

@michalcukierman
Copy link
Author

I think without understanding why the rewind/skip/batch logic was introduced, it's hard to introduce a solution.
I don't have a Pulsar implementation experience, but I may have a look at the codebase later today, maybe I'll find something.

@michalcukierman
Copy link
Author

@summeriiii I was not able to provide a solution yesterday (I had false-positive tests on my local env, working with simple cases, failing with others).

I've noticed that there were two changes in 2020:

message MessageIdData {
    required uint64 ledgerId = 1;
    required uint64 entryId  = 2;
    optional int32 partition = 3 [default = -1];
    optional int32 batch_index = 4 [default = -1];
    repeated int64 ack_set = 5;
    optional int32 batch_size = 6;

    // For the chunk message id, we need to specify the first chunk message id.
    optional MessageIdData first_chunk_message_id = 7;
}

That's ack_set and batch_size, maybe with those two we can be used to check if the message was correctly consumed (and as a result re-read of a Batch is needed)?

I have tried to modify ServerCnx to not produce BatchMessageIdImpl when there is no batch index explicitly given:

        final MessageIdImpl startMessageId = subscribe.hasStartMessageId() ? new BatchMessageIdImpl(
                subscribe.getStartMessageId().getLedgerId(), subscribe.getStartMessageId().getEntryId(),
                subscribe.getStartMessageId().getPartition(), subscribe.getStartMessageId().getBatchIndex())
                : null;

but unfortunately I got failing tests with: startMessageIdInclusive.

Another alternative would be to fix the backlog after the client ignored the message, but I think that the issue require some attention from a person who knows the codebase more than me.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/bug The PR fixed a bug or issue reported a bug
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants