-
-
Notifications
You must be signed in to change notification settings - Fork 678
Use AckExplicitPolicy
instead of AckAllPolicy
#3288
Conversation
Codecov ReportAttention:
Additional details and impacted files@@ Coverage Diff @@
## main #3288 +/- ##
==========================================
- Coverage 65.71% 65.55% -0.17%
==========================================
Files 509 509
Lines 57420 57466 +46
==========================================
- Hits 37734 37670 -64
- Misses 15851 15928 +77
- Partials 3835 3868 +33
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
@@ -49,7 +49,7 @@ import ( | |||
) | |||
|
|||
// TODO: Does this value make sense? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we still need this comment anymore?
// The consumer already exists, try to update if necessary. | ||
if info != nil { | ||
switch { | ||
case info.Config.AckWait.Nanoseconds() != consumerConfig.AckWait.Nanoseconds(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could use a comment.
Something like "The existing consumer's AckWait timeout does not match the current expected timeout value"
As a note: we probably should revisit |
Fixes #3240 and potentially a root cause for state resets.
While testing, I've had added some more debug logging:
So we did receive the same event over and over again. Given they are
KindNew
, we don't short circuit if we already processed them, which potentially caused the state to be calculated with a now wrong state snapshot.Also fixes the back pressure metric. We now correctly increment the counter once we sent the message to NATS and decrement it once we actually processed an event.