Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Auto offset commits (the right way) #782

Merged
merged 23 commits into from
Jan 2, 2024
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 81 additions & 0 deletions core/src/main/scala/com/banno/kafka/consumer/ConsumerOps.scala
Original file line number Diff line number Diff line change
Expand Up @@ -353,4 +353,85 @@ case class ConsumerOps[F[_], K, V](consumer: ConsumerApi[F, K, V]) {
.recordStream(pollTimeout)
.evalMap(r => process(r) <* consumer.commitSync(r.nextOffset))

/** Returns a stream that processes records using the specified function,
* committing offsets for successfully processed records, either after
* processing the specified number of records, or after the specified time
* has elapsed since the last offset commit. If the processing function
* returns a failure, the stream will halt with that failure, and the offsets
* will not be committed. This is at-least-once processing: records for which
* the function returns success, but whose offset have not yet been committed
* will be reprocessed after a subsequent failure. In some use cases this
* pattern is more appropriate than just using auto-offset-commits, since it
* will not commit offsets for failed records when the consumer is closed.
* The consumer must be configured to disable offset auto-commits.
*/
def readProcessCommit2[A](
zcox marked this conversation as resolved.
Show resolved Hide resolved
pollTimeout: FiniteDuration,
maxRecordCount: Long = 1000L,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If someone passes in a negative number, or a negative time would anything unexpected happen? I am not sure how consumer.recordStream(-2.seconds) would deal with it, but in the other two cases, it would mean that we would always be committing for every record. The question then becomes whether or not this is desirable and if we think that is obvious enough at the call site?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. This probably needs more validation of inputs.

maxCommitTime: Long = 60000L,
)(
process: ConsumerRecord[K, V] => F[A]
)(implicit C: Clock[F], S: Sync[F]): Stream[F, A] =
zcox marked this conversation as resolved.
Show resolved Hide resolved
for {
state <- Stream.eval(
Ref.of[F, OffsetCommitState](OffsetCommitState.empty)
)
record <- consumer.recordStream(pollTimeout)
a <- Stream.eval(process(record))
s <- Stream.eval(state.updateAndGet(_.update(record)))
zcox marked this conversation as resolved.
Show resolved Hide resolved
now <- Stream.eval(C.realTime.map(_.toNanos))
() <- Stream.eval(
s.needToCommit(maxRecordCount, now, maxCommitTime)
.traverse_(os =>
consumer.commitSync(os) *>
state.update(_.reset(now))
)
)
} yield a

case class OffsetCommitState(
offsets: Map[TopicPartition, Long],
recordCount: Long,
lastCommitTime: Long,
) {
def update(record: ConsumerRecord[_, _]): OffsetCommitState =
copy(
offsets = offsets + (new TopicPartition(
record.topic,
record.partition,
) -> record.offset),
recordCount = recordCount + 1,
)

def needToCommit(
maxRecordCount: Long,
now: Long,
maxCommitTime: Long,
): Option[Map[TopicPartition, OffsetAndMetadata]] =
if (
zcox marked this conversation as resolved.
Show resolved Hide resolved
recordCount >= maxRecordCount || (now - lastCommitTime) >= maxCommitTime
)
nextOffsets.some
else
none

def nextOffsets: Map[TopicPartition, OffsetAndMetadata] =
offsets.view.mapValues(o => new OffsetAndMetadata(o + 1)).toMap

def reset(time: Long): OffsetCommitState =
copy(
zcox marked this conversation as resolved.
Show resolved Hide resolved
offsets = Map.empty,
recordCount = 0L,
lastCommitTime = time,
)
}

object OffsetCommitState {
val empty = OffsetCommitState(
offsets = Map.empty,
recordCount = 0L,
lastCommitTime = 0L,
)
}

}
Loading