Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] Msg backlog & unack msg remains when using acknowledgeAsync #21958

Open
1 of 2 tasks
pqab opened this issue Jan 23, 2024 · 24 comments
Open
1 of 2 tasks

[Bug] Msg backlog & unack msg remains when using acknowledgeAsync #21958

pqab opened this issue Jan 23, 2024 · 24 comments

Comments

@pqab
Copy link

pqab commented Jan 23, 2024

Search before asking

  • I searched in the issues and found nothing similar.

Version

3.0.0

Minimal reproduce step

  1. Publish 600k messages

  2. Start 2 consumers with different subscription name and subscribe from Earliest

one with async ack

Mono.fromCompletionStage(() -> consumer.acknowledgeAsync(message))

another one with sync ack

Mono.fromRunnable(() -> consumer.acknowledge(message))
  1. Wait until it finished###

What did you expect to see?

msg backlog & unack message should be 0 for both acknowledge & acknowledgeAsync subscription

What did you see instead?

There are few messages in the backlog & unack message left even we received the ack callback when using acknowledgeAsync, acknowledge is working fine

Topic stats for the acknowledgeAsync subscription for reference

{
    "msgRateOut" : 382.4166635563445,
    "msgThroughputOut" : 1143831.4573635042,
    "bytesOutCounter" : 1590183426,
    "msgOutCounter" : 531437,
    "msgRateRedeliver" : 0.0,
    "messageAckRate" : 397.2166640450367,
    "chunkedMessageRate" : 0,
    "msgBacklog" : 4,
    "backlogSize" : 572603343,
    "earliestMsgPublishTimeInBacklog" : 0,
    "msgBacklogNoDelayed" : 4,
    "blockedSubscriptionOnUnackedMsgs" : false,
    "msgDelayed" : 0,
    "unackedMessages" : 7,
    "type" : "Key_Shared",
    "msgRateExpired" : 0.0,
    "totalMsgExpired" : 0,
    "lastExpireTimestamp" : 1706002395690,
    "lastConsumedFlowTimestamp" : 1706002431797,
    "lastConsumedTimestamp" : 1706002428241,
    "lastAckedTimestamp" : 1706002436543,
    "lastMarkDeleteAdvancedTimestamp" : 1706002000095,
    "consumers" : [ {
        "msgRateOut" : 382.4166635563445,
        "msgThroughputOut" : 1143831.4573635042,
        "bytesOutCounter" : 598198012,
        "msgOutCounter" : 200000,
        "msgRateRedeliver" : 0.0,
        "messageAckRate" : 397.2166640450367,
        "chunkedMessageRate" : 0.0,
        "consumerName" : "consumer",
        "availablePermits" : 1000,
        "unackedMessages" : 7,
        "avgMessagesPerEntry" : 15,
        "blockedConsumerOnUnackedMsgs" : false,
        "readPositionWhenJoining" : "1018:10411",
        "address" : "/100.96.65.127:20857",
        "connectedSince" : "2024-01-23T09:26:34.035732392Z",
        "clientVersion" : "Pulsar-Java-v3.0.0",
        "lastAckedTimestamp" : 1706002436543,
        "lastConsumedTimestamp" : 1706002428241,
        "lastConsumedFlowTimestamp" : 1706002431797,
        "keyHashRanges" : [ "[0, 6276111]", "[6276112, 13422723]", "[13422724, 52719097]", "[52719098, 60122502]", "[60122503, 74675312]", "[74675313, 161996019]", "[161996020, 207509307]", "[207509308, 220775154]", "[220775155, 229655103]", "[229655104, 234228609]", "[234228610, 276636200]", "[276636201, 324880668]", "[324880669, 369408646]", "[369408647, 374232013]", "[374232014, 379665156]", "[379665157, 380576699]", "[380576700, 405888400]", "[405888401, 428673014]", "[428673015, 453720349]", "[453720350, 488351370]", "[488351371, 496052795]", "[496052796, 504603928]", "[504603929, 508760821]", "[508760822, 526107528]", "[526107529, 576532446]", "[576532447, 580447007]", "[580447008, 587033352]", "[587033353, 604050605]", "[604050606, 607110270]", "[607110271, 611987246]", "[611987247, 627803480]", "[627803481, 628603516]", "[628603517, 643340895]", "[643340896, 649016535]", "[649016536, 682844752]", "[682844753, 723271437]", "[723271438, 725352428]", "[725352429, 753192194]", "[753192195, 798356347]", "[798356348, 824987130]", "[824987131, 838415369]", "[838415370, 853347508]", "[853347509, 869121139]", "[869121140, 937189723]", "[937189724, 1004046645]", "[1004046646, 1013552657]", "[1013552658, 1063116829]", "[1063116830, 1072226625]", "[1072226626, 1102842607]", "[1102842608, 1113396043]", "[1113396044, 1133270607]", "[1133270608, 1149712306]", "[1149712307, 1196163934]", "[1196163935, 1218114318]", "[1218114319, 1239267311]", "[1239267312, 1283886353]", "[1283886354, 1298017483]", "[1298017484, 1300597583]", "[1300597584, 1311995628]", "[1311995629, 1407745525]", "[1407745526, 1487107354]", "[1487107355, 1500070137]", "[1500070138, 1527269282]", "[1527269283, 1579052216]", "[1579052217, 1584997034]", "[1584997035, 1595017626]", "[1595017627, 1601176083]", "[1601176084, 1618519791]", "[1618519792, 1641494763]", "[1641494764, 1656777545]", "[1656777546, 1681398228]", "[1681398229, 1697816514]", "[1697816515, 1706859249]", "[1706859250, 1720068125]", "[1720068126, 1779743735]", "[1779743736, 1784442894]", "[1784442895, 1823221256]", "[1823221257, 1824702978]", "[1824702979, 1838089487]", "[1838089488, 1857634960]", "[1857634961, 1861247796]", "[1861247797, 1863792279]", "[1863792280, 1937071475]", "[1937071476, 1941970878]", "[1941970879, 1965632398]", "[1965632399, 1970489707]", "[1970489708, 1979412755]", "[1979412756, 1983921632]", "[1983921633, 2008961115]", "[2008961116, 2016328150]", "[2016328151, 2020236760]", "[2020236761, 2023857462]", "[2023857463, 2032948319]", "[2032948320, 2045854070]", "[2045854071, 2060460824]", "[2060460825, 2067248154]", "[2067248155, 2103376046]", "[2103376047, 2127999799]", "[2127999800, 2131945474]", "[2131945475, 2143021740]" ],
        "metadata" : { },
        "lastAckedTime" : "2024-01-23T09:33:56.543Z",
        "lastConsumedTime" : "2024-01-23T09:33:48.241Z"
    } ],
    "isDurable" : true,
    "isReplicated" : false,
    "allowOutOfOrderDelivery" : false,
    "keySharedMode" : "AUTO_SPLIT",
    "consumersAfterMarkDeletePosition" : { },
    "nonContiguousDeletedMessagesRanges" : 4,
    "nonContiguousDeletedMessagesRangesSerializedSize" : 71,
    "delayedMessageIndexSizeInBytes" : 0,
    "subscriptionProperties" : { },
    "filterProcessedMsgCount" : 0,
    "filterAcceptedMsgCount" : 0,
    "filterRejectedMsgCount" : 0,
    "filterRescheduledMsgCount" : 0,
    "durable" : true,
    "replicated" : false
}

Anything else?

we run for multiple times, and every time there are few backlog & unack message left for the acknowledgeAsync subscription

Are you willing to submit a PR?

  • I'm willing to submit a PR!
@pqab
Copy link
Author

pqab commented Jan 23, 2024

@semistone
could you share the code to reproduce the issue?

@semistone
Copy link

Our test is running in
batch receive max 1000 events
then process those 1000 events concurrent and parallel

Flux.fromIterable(events).parallel().runOn(Schedulers.fromExecutor(this.executorService))
.flatMap(event -> { // handle event and ack }) 

originally we use acknowledgeAsync and seem have issue,

				ackMono = Mono.fromCompletionStage(() -> consumer.acknowledgeAsync(message))
						.doOnSuccess(v -> ackCount.incrementAndGet());

so we replace by

	@SneakyThrows
	private void acknowledge(Message<?> message) {
		ackLock.lock();
		try {
			consumer.acknowledge(message);
			ackCount.incrementAndGet();
		} finally {
			ackLock.unlock();
		}
	}
				Mono<Void> runnable = Mono.fromRunnable(() -> this.acknowledge(message));
				ackMono = runnable.subscribeOn(Schedulers.fromExecutor(this.executorService));

which force to synchronized all acknowledge by ReentrantLock,
then seem it worked.

I could try to write test code later if needed.

@lhotari
Copy link
Member

lhotari commented Jan 23, 2024

Mono.fromCompletionStage(() -> consumer.acknowledgeAsync(message))

A Mono doesn't do anything unless it is subscribed. would be useful to have a simple Java class or test case that runs the logic that you are using.

@lhotari
Copy link
Member

lhotari commented Jan 23, 2024

@pqab
Copy link
Author

pqab commented Jan 24, 2024

we published around 1m of messages, and we are able to reproduce with this code
v3.1.2...semistone:pulsar:test/flux-test

bin/pulsar-perf consume persistent://tenant1/namespace1/topic1 --auth-plugin org.apache.pulsar.client.impl.auth.AuthenticationTls --auth-params '{"tlsCertFile":"conf/superuser.cer","tlsKeyFile":"conf/superuser.key.pem"}' --test-reactor -sp Earliest -st Key_Shared -ss sub1

the unack message keeps increasing and the available permits become negative value, which makes the consumer couldn't poll more events unless we restart it, in order to re-delivery the events to the consumer

@lhotari
Copy link
Member

lhotari commented Jan 24, 2024

3.0.0

Is there a chance to use 3.0.2 ? A lot of bugs have been fixed in 3.0.1 and 3.0.2 . This applies to both broker and the the client.

@lhotari
Copy link
Member

lhotari commented Jan 24, 2024

we published around 1m of messages, and we are able to reproduce with this code
v3.1.2...semistone:pulsar:test/flux-test

Thanks for sharing the repro app.
I'll give it a try soon. One question about the repro, you have -st Key_Shared. Does the problem reproduce with the Shared subscription type?

@pqab
Copy link
Author

pqab commented Jan 25, 2024

we published around 1m of messages, and we are able to reproduce with this code
v3.1.2...semistone:pulsar:test/flux-test

Thanks for sharing the repro app. I'll give it a try soon. One question about the repro, you have -st Key_Shared. Does the problem reproduce with the Shared subscription type?

Yes, I run again with Shared subscription type, it also happens, I think the type doesn't matter

@pqab
Copy link
Author

pqab commented Jan 25, 2024

3.0.0

Is there a chance to use 3.0.2 ? A lot of bugs have been fixed in 3.0.1 and 3.0.2 . This applies to both broker and the the client.

The original message was running from our application using 3.0.0 client with 3.1.2 broker, we are going to upgrade client to 3.0.2 for now, and the above reproduce code was running from the server, so both client & broker are 3.1.2

@lhotari
Copy link
Member

lhotari commented May 31, 2024

This is possible related to #22601 / #22810.

@semistone
Copy link

semistone commented Jun 6, 2024

We tested again it still happen, but we found pulsar client default doesn't wait ack return and return
CompletableFuture.complete directly.

https://github.com/apache/pulsar/blob/master/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java#L264

so we turn on
https://pulsar.apache.org/api/client/3.0.x/org/apache/pulsar/client/api/ConsumerBuilder.html#isAckReceiptEnabled(boolean)

because if enable that option, then it will create lock read lock
https://github.com/apache/pulsar/blob/master/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java#L261

so the concurrent issue will disappear.

and if I test without enable that option.
because it won't wait the acknowledge, so it will look like memory leak in my performance test.
because too many currentIndividualAckFuture is queueing.

so maybe concurrent issue still there or maybe just too many currentIndividualAckFuture pile up.

but at least we could enable isAckReceiptEnabled to fix this issue.
or I guess if always using that read lock with or without isAckReceiptEnabled, it will fix this issue as well.

@lhotari
Copy link
Member

lhotari commented Jun 6, 2024

msg backlog & unack message should be 0 for both acknowledge & acknowledgeAsync subscription

regarding unack message counts, #22657 is possibly related, see #22657 (comment)

@lhotari
Copy link
Member

lhotari commented Sep 3, 2024

There has been an ack issue with batch index acknowledgements, #22353. That must be a different issue.

@lhotari
Copy link
Member

lhotari commented Sep 4, 2024

I made an attempt to reproduce this using Pulsar client directly. The problem didn't reproduce with https://github.com/lhotari/pulsar-playground/blob/master/src/main/java/com/github/lhotari/pulsar/playground/TestScenarioAckIssue.java . (The test code is fairly complex due to the counters to validate behavior and since I had the attempt to increase chances of race conditions.)

@lhotari
Copy link
Member

lhotari commented Sep 4, 2024

I'll attempt to reproduce with the provided changes to pulsar-perf.

@lhotari
Copy link
Member

lhotari commented Sep 4, 2024

we published around 1m of messages, and we are able to reproduce with this code
v3.1.2...semistone:pulsar:test/flux-test

rebased it over master in master...lhotari:pulsar:lh-issue21958-flux-test

@lhotari
Copy link
Member

lhotari commented Sep 4, 2024

I'm not able to reproduce.

compiling Pulsar branch with rebased flux-test patch for pulsar-perf

git clone --depth 1 -b lh-issue21958-flux-test https://github.com/lhotari/pulsar
cd pulsar
mvn -Pcore-modules,-main -T 1C clean install -DskipTests -Dspotbugs.skip=true -Dcheckstyle.skip=true -Dlicense.skip=true -DnarPluginPhase=none

running Pulsar

rm -rf data
PULSAR_STANDALONE_USE_ZOOKEEPER=1 bin/pulsar standalone -nss -nfw 2>&1 | tee standalone.log

running consumer

bin/pulsar-perf consume test --test-reactor -sp Earliest -st Key_Shared -ss sub1

running producer

bin/pulsar-perf produce test -mk random -r 50000

@semistone @pqab Are you able to reproduce with master branch version of Pulsar? how about of branches/releases? Is this issue resolved?

@lhotari
Copy link
Member

lhotari commented Sep 5, 2024

I made an attempt to reproduce this using Pulsar client directly. The problem didn't reproduce with https://github.com/lhotari/pulsar-playground/blob/master/src/main/java/com/github/lhotari/pulsar/playground/TestScenarioAckIssue.java . (The test code is fairly complex due to the counters to validate behavior and since I had the attempt to increase chances of race conditions.)

One possible variation to this scenario would be to test together with topic unloading events.

@lhotari
Copy link
Member

lhotari commented Sep 5, 2024

Related issue #22709

@semistone
Copy link

semistone commented Sep 5, 2024

let me check tomrrow.

@lhotari
Copy link
Member

lhotari commented Sep 5, 2024

let me check tomrrow.

@semistone thanks, that you be helpful. If it reproduces only within a cluster with multiple nodes and other traffic that could mean that a load balancing event is triggering the problem. Currently in-flight acknowledgements could get lost when this happens. Usually this gets recovered, but it's possible that there's a race condition where the acknowledgements get lost and the message doesn't get redelivered during the unload/reconnection event triggered by load balancing. It should be possible to simulate this scenario also by triggering topic unloads with the admin api.

@semistone
Copy link

semistone commented Sep 6, 2024

because our cluster already on production, so I can't test it on our cluster
instead it I test pulsar version apache-pulsar-3.3.1 standalone on my local

it seem more difficult to reproduce this issue compare to first time when we report it.
but after retry many times, I still could see something strange.
my steps is

  1. start standalone pulsar
  2. publish 6M messages and consume subscription sub1 + sub2 and stop consumer/publisher and check 6M messages in backlog
  3. unsubscribe sub bin/pulsar-admin topics unsubscribe persistent://public/default/my-topic -s=sub
  4. consume by bin/pulsar-perf consume my-topic --test-reactor -sp Earliest -st Key_Shared -ss sub -n 20
  5. consume sub and monitor by 'watch -n 5 'bin/pulsar-admin topics stats persistent://public/default/my-topic |grep -i msgBack'

after retry many times, I could see only once the consumer stopped like

2024-09-06T17:33:01,433+0900 [main] INFO  org.apache.pulsar.testclient.PerformanceConsumer - Throughput received:    1045 msg --- 0.000  msg/s --- 0.000 Mbit/s  --- Latency: mean: 0.000 ms - med: 0 - 95pct: 0 - 99pct: 0 - 99.9pct: 0 - 99.99pct: 0 - Max: 0

but I could still see backlog
like "msgBacklog" : 996329,
but unlike previous unackedMessages have something.
this time unackedMessages is 0.

and I found it happened once that it stopped consuming for about 1 mins and recover later.
or if we push more messages, then it will continues to consume.

I will try to test master branch and double check any mistake in my testing later.

@lhotari
Copy link
Member

lhotari commented Sep 7, 2024

@semistone there are multiple issues contributing to this, here's one update: #22709 (comment)

@lhotari
Copy link
Member

lhotari commented Sep 7, 2024

I have created a proposal "PIP-377: Automatic retry for failed acknowledgements", #23267 (rendered doc) . Discussion thread https://lists.apache.org/thread/7sg7hfv9dyxto36dr8kotghtksy1j0kr

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants