Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Offset validation observer #1014

Open
wants to merge 5 commits into
base: main
Choose a base branch
from

Conversation

patriknw
Copy link
Member

  • more insights to offset progress
  • InternalStableApi

* more insights to offset progress
* InternalStableApi
@@ -236,6 +239,8 @@ private[projection] class R2dbcOffsetStore(
// To avoid delete requests when no new offsets have been stored since previous delete
private val idle = new AtomicBoolean(false)

private var validationObservers: immutable.Seq[R2dbcOffsetValidationObserver] = Nil
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't it need to be volatile or an atomic (or a copy method of some kind)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's set once at startup, and as far as I can see it's not important if we would miss a few notifications. The purpose is not to track each and every offset, but see overall progress or what is going on. I can add a comment.

if (validationObservers.nonEmpty)
result.foreach(_.foreach {
case (env, validation) => notifyValidationObserver(env, validation)
})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should it maybe rather be andThen:ed onto the returned future so the observer is a part of the backpressure rather than potentially queued up in parallel on the ec?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point, I was thinking that it shouldn't slow down the offset store, but it wouldn't be good if the observer is slower then the offset store

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed in 5f54fc5

case object Accepted extends OffsetValidation
case object Duplicate extends OffsetValidation
case object Rejected extends OffsetValidation
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A thought: these could be public do-not-extend traits with the internal validation concrete adt types extending them so that no translation step is actually needed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed in b0ca096

Copy link
Member

@johanandren johanandren left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants