-
Notifications
You must be signed in to change notification settings - Fork 36
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Offset validation observer #1014
base: main
Are you sure you want to change the base?
Conversation
patriknw
commented
Sep 19, 2023
- more insights to offset progress
- InternalStableApi
* more insights to offset progress * InternalStableApi
… the source field
@@ -236,6 +239,8 @@ private[projection] class R2dbcOffsetStore( | |||
// To avoid delete requests when no new offsets have been stored since previous delete | |||
private val idle = new AtomicBoolean(false) | |||
|
|||
private var validationObservers: immutable.Seq[R2dbcOffsetValidationObserver] = Nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't it need to be volatile or an atomic (or a copy method of some kind)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's set once at startup, and as far as I can see it's not important if we would miss a few notifications. The purpose is not to track each and every offset, but see overall progress or what is going on. I can add a comment.
if (validationObservers.nonEmpty) | ||
result.foreach(_.foreach { | ||
case (env, validation) => notifyValidationObserver(env, validation) | ||
}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should it maybe rather be andThen:ed onto the returned future so the observer is a part of the backpressure rather than potentially queued up in parallel on the ec?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good point, I was thinking that it shouldn't slow down the offset store, but it wouldn't be good if the observer is slower then the offset store
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed in 5f54fc5
case object Accepted extends OffsetValidation | ||
case object Duplicate extends OffsetValidation | ||
case object Rejected extends OffsetValidation | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A thought: these could be public do-not-extend traits with the internal validation concrete adt types extending them so that no translation step is actually needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed in b0ca096
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM