-
Notifications
You must be signed in to change notification settings - Fork 336
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support acknowledging a list of message IDs #1301
Support acknowledging a list of message IDs #1301
Conversation
This feature is good to me, I have some questions:
|
The 1st questionWhat's confusing is the tracking message ID itself. The existing func TestMyAck(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
})
assert.Nil(t, err)
defer client.Close()
topic := fmt.Sprintf("test-my-ack-%v", time.Now().Nanosecond())
createConsumer := func() Consumer {
consumer, err := client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: "my-sub",
SubscriptionInitialPosition: SubscriptionPositionEarliest,
Type: Shared,
AckWithResponse: true,
})
assert.Nil(t, err)
return consumer
}
consumer := createConsumer()
sendMessages(t, client, topic, 0, 2, true) // send 0 and 1 in the same batch
msgs := receiveMessages(t, consumer, 2)
for i := 0; i < 2; i++ {
fmt.Println("Received message: ", string(msgs[i].Payload()), msgs[i].ID())
}
if err := consumer.AckID(msgs[0].ID()); err != nil {
fmt.Println("Ack message 0 failed: ", err.Error())
} else {
fmt.Println("Ack message 0 success")
}
consumer.Close()
consumer = createConsumer()
msgs = receiveMessages(t, consumer, 1)
fmt.Println("Received message: ", string(msgs[0].Payload()), msgs[0].ID())
} Outputs:
From the perspective from user side:
Actually this API implements for msgID, err := range consumer.AckIDList([]MessageID{msgs[0].ID()}) {
fmt.Println("Failed to acknowledge ", msgID, err.Error())
} The outputs will be:
P.S. we should fix the Users should add the failed message ID to the next message ID list passed to IMO, we should make batch index ACK enabled by default for both client side and server side. The current default behavior is really confusing. The 2nd question
|
@nodece I added |
Your explanation is clear, I agree with you:
Default to batch index ACK is disabled. When disabled, the users must ack all batch messages by
You only ack the first message, the client doesn't send the ack request to the broker, so you still receive the first message after restart the consumer. I remember the Java client has the same behavior, if I am wrong, please let me know.
You can refactor the You can also disscus this issue on the dev mailing list. |
There is no way to let the client know whether the ack request was sent. That's the point. In addition, users should never care if the ack was sent. They only care if the ack succeeded.
Yeah, but this behavior is wrong. I don't want to blame the author but the error handling is really not taken care of. Happy path is always easy to write.
Oh I see. Let me think a better way to reuse the code.
I plan to write a PIP to explain these things, including why to enable batch index ACK by default, the mess of acknowledge APIs' semantics. |
@nodece I reused the tracking message ID's tracker and used the semantics that ACK on an incomplete message ID in the batch does not return an error. Please check the latest tests. As for reusing the |
panic: runtime error: invalid memory address or nil pointer dereference [recovered] panic: runtime error: invalid memory address or nil pointer dereference [signal SIGSEGV: segmentation violation code=0x2 addr=0x38 pc=0x1033af634] goroutine 28 [running]: testing.tRunner.func1.2({0x103773580, 0x103e9bde0}) /usr/local/go/src/testing/testing.go:1631 +0x1c4 testing.tRunner.func1() /usr/local/go/src/testing/testing.go:1634 +0x33c panic({0x103773580?, 0x103e9bde0?}) /usr/local/go/src/runtime/panic.go:770 +0x124 github.com/apache/pulsar-client-go/pulsar.(*consumer).checkMsgIDPartition(0x140001126c8, {0x0?, 0x0?}) /Users/xuyunze/github.com/bewaremypower/pulsar-client-go/pulsar/consumer_impl.go:757 +0x24
d4687ce
to
14cbf16
Compare
@RobertIndie @nodece @crossoverJie @shibd Thanks for your reviews. The For the simple consumer case that only 1 topic is subscribed, since only 1 ACK request will be sent, these message IDs either all fail or all succeed. The only exceptional case is that invalid message IDs are passed. However, in this case, there is no way to handle these message IDs because acknowledging them will always fail. Hence I just return a trivial However, if the consumer subscribes multiple topics, there might be multiple See the API documents and the tests for details. PTAL again. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left some small comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/LGTM
Motivation
For
Shared
andKey_Shared
subscriptions, if some messages failed to acknowledge, these messages would never be dispatched to the consumer until restarted. If the number of unacknowledged messages reached the threshold, the broker would never dispatch messages anymore. However, the consumer does not have a chance to check which messages failed to acknowledged.Even if this case was not hit, if the consumer restarted after consuming many messages, the old unacknowledged messages would be delivered again, which is very confusing and might affect the business logic.
Therefore, we can only enable
AckWithResponse
to know which messages failed to acknowledge. Unfortunately, currently the Go SDK only supports acknowledging single messages. It harms the performance significantly.To solve this solution, this PR adds an API to acknowledge a list of messages.
Users can save the failed message IDs and add them again in the next
AckIDList
call.Modifications
AckIDList
API and reuse the logic ofack_grouping_tracker.go
to convert user provided message IDs to the message IDs in the ACK requestsinternalAckList
and wait for the response error via a error channelTestAckIDList
to verify the case that a message ID list has message IDs of non-batched messages, whole batched messages or partial batched messages because the behaviors are different if the batch index ACK is enabledTestMultiTopicAckIDList
to verify the multi-topics case, including regex subscription.Does this pull request potentially affect one of the following parts:
If
yes
was chosen, please highlight the changesDocumentation