Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Auto offset commits (the right way) #782

Merged
merged 23 commits into from
Jan 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 89 additions & 0 deletions core/src/main/scala/com/banno/kafka/consumer/ConsumerOps.scala
Original file line number Diff line number Diff line change
Expand Up @@ -353,4 +353,93 @@ case class ConsumerOps[F[_], K, V](consumer: ConsumerApi[F, K, V]) {
.recordStream(pollTimeout)
.evalMap(r => process(r) <* consumer.commitSync(r.nextOffset))

/** Returns a stream that processes records using the specified function,
* committing offsets for successfully processed records, either after
* processing the specified number of records, or after the specified time
* has elapsed since the last offset commit. If the processing function
* returns a failure, offsets for records successfully processed before that
* failure will be committed, and then the stream will halt with that
* failure. This is at-least-once processing: after a restart, the record
* that failed will be reprocessed. In some use cases this pattern is more
* appropriate than just using auto-offset-commits, since it will not commit
* offsets for failed records when the consumer is closed. The consumer must
* be configured to disable offset auto-commits.
*/
def processingAndCommitting[A](
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does Stream.groupWithin help here? It emits chunks on the maxRecordCount-and-maxElapsedTime cadence.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TIL Stream.groupWithin. Reading its docs, IIUC it seems that it will buffer up elements of the input stream, until it obtains a certain number or timeout exceeded. For a Kafka consumer, I don't think we'd want that behavior, since it would introduce delay.

We want the elements to flow from the input stream as they're read from Kafka, and take an action (i.e. commit offsets) after a number of records or a timeout. Stream.groupWithin is close to that, but if it delays records going to the process function we wouldn't want that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not if it came after L390, but I would need to think about the onError on line 385. Don't let this block it, but I may play type tetris later.

pollTimeout: FiniteDuration,
maxRecordCount: Long = 1000L,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If someone passes in a negative number, or a negative time would anything unexpected happen? I am not sure how consumer.recordStream(-2.seconds) would deal with it, but in the other two cases, it would mean that we would always be committing for every record. The question then becomes whether or not this is desirable and if we think that is obvious enough at the call site?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. This probably needs more validation of inputs.

maxElapsedTime: FiniteDuration = 60.seconds,
)(
process: ConsumerRecord[K, V] => F[A]
)(implicit C: Clock[F], S: Concurrent[F]): Stream[F, A] =
for {
state <- Stream.eval(
C.monotonic
.flatMap(now =>
Ref.of[F, OffsetCommitState](OffsetCommitState.empty(now))
)
)
record <- consumer.recordStream(pollTimeout)
a <- Stream.eval(
process(record)
.onError(_ =>
// we can still commit offsets that were successfully processed, before this stream halts, consumer is closed, etc
// this is still at-least-once processing, but minimizes the amount of reprocessing after restart
state.get.flatMap(s => consumer.commitSync(s.nextOffsets))
)
)
s <- Stream.eval(state.updateAndGet(_.update(record)))
zcox marked this conversation as resolved.
Show resolved Hide resolved
now <- Stream.eval(C.monotonic)
() <- Stream.eval(
s.needToCommit(maxRecordCount, now, maxElapsedTime)
.traverse_(os =>
consumer.commitSync(os) *>
state.update(_.reset(now))
)
)
} yield a

case class OffsetCommitState(
offsets: Map[TopicPartition, Long],
recordCount: Long,
lastCommitTime: FiniteDuration,
) {
def update(record: ConsumerRecord[_, _]): OffsetCommitState =
copy(
offsets = offsets + (new TopicPartition(
record.topic,
record.partition,
) -> record.offset),
recordCount = recordCount + 1,
)

def needToCommit(
maxRecordCount: Long,
now: FiniteDuration,
maxElapsedTime: FiniteDuration,
): Option[Map[TopicPartition, OffsetAndMetadata]] = {
(recordCount >= maxRecordCount || (now - lastCommitTime) >= maxElapsedTime)
.guard[Option]
.as(nextOffsets)
}

def nextOffsets: Map[TopicPartition, OffsetAndMetadata] =
offsets.view.mapValues(o => new OffsetAndMetadata(o + 1)).toMap

def reset(time: FiniteDuration): OffsetCommitState =
OffsetCommitState.empty(time)
}

object OffsetCommitState {
val empty = OffsetCommitState(
offsets = Map.empty,
recordCount = 0L,
lastCommitTime = 0.millis,
)
def empty(time: FiniteDuration): OffsetCommitState =
empty.copy(
lastCommitTime = time
)
}

}
45 changes: 45 additions & 0 deletions core/src/test/scala/com/banno/kafka/KafkaSpec.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright 2019 Jack Henry & Associates, Inc.®
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.banno.kafka

import cats.syntax.all.*
import cats.effect.Sync
import org.scalacheck.Gen
import org.apache.kafka.clients.admin.NewTopic
import com.banno.kafka.admin.AdminApi

trait KafkaSpec {

val bootstrapServer = "localhost:9092"
val schemaRegistryUrl = "http://localhost:8091"

def randomId: String =
Gen.listOfN(10, Gen.alphaChar).map(_.mkString).sample.get
def genGroupId: String = randomId
def genTopic: String = randomId

def createTestTopic[F[_]: Sync](partitionCount: Int = 1): F[String] = {
val topicName = genTopic
AdminApi
.createTopicsIdempotent[F](
bootstrapServer,
List(new NewTopic(topicName, partitionCount, 1.toShort)),
)
.as(topicName)
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
/*
* Copyright 2019 Jack Henry & Associates, Inc.®
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.banno.kafka

import cats.syntax.all.*
import cats.effect.IO
import fs2.Stream
import munit.CatsEffectSuite
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.clients.consumer.OffsetAndMetadata
import com.banno.kafka.producer.*
import com.banno.kafka.consumer.*
import scala.concurrent.duration.*
import natchez.Trace.Implicits.noop

class ProcessingAndCommittingSpec extends CatsEffectSuite with KafkaSpec {

def offsets(
p: TopicPartition,
o: Long,
): Map[TopicPartition, OffsetAndMetadata] =
Map(p -> new OffsetAndMetadata(o))

val empty = Map.empty[TopicPartition, OffsetAndMetadata]

test("processingAndCommitting commits after number of records") {
ProducerApi
.resource[IO, Int, Int](
BootstrapServers(bootstrapServer)
)
.use { producer =>
ConsumerApi
.resource[IO, Int, Int](
BootstrapServers(bootstrapServer),
GroupId(genGroupId),
AutoOffsetReset.earliest,
EnableAutoCommit(false),
)
.use { consumer =>
for {
topic <- createTestTopic[IO]()
p = new TopicPartition(topic, 0)
ps = Set(p)
values = (0 to 9).toList
_ <- producer.sendAsyncBatch(
values.map(v => new ProducerRecord(topic, v, v))
)
() <- consumer.subscribe(topic)
c0 <- consumer.partitionQueries.committed(ps)
pac = consumer.processingAndCommitting(
pollTimeout = 100.millis,
maxRecordCount = 2,
maxElapsedTime = Long.MaxValue.nanos,
)(_.value.pure[IO])
committed = Stream.repeatEval(
consumer.partitionQueries.committed(ps)
)
results <- pac
.take(values.size.toLong)
.interleave(committed)
.compile
.toList
} yield {
assertEquals(c0, empty)
assertEquals(results.size, values.size * 2)
// TODO rewrite this to use values, not so hard-coded
assertEquals(results(0), 0)
assertEquals(results(1), empty)
assertEquals(results(2), 1)
assertEquals(results(3), offsets(p, 2))
assertEquals(results(4), 2)
assertEquals(results(5), offsets(p, 2))
assertEquals(results(6), 3)
assertEquals(results(7), offsets(p, 4))
assertEquals(results(8), 4)
assertEquals(results(9), offsets(p, 4))
assertEquals(results(10), 5)
assertEquals(results(11), offsets(p, 6))
assertEquals(results(12), 6)
assertEquals(results(13), offsets(p, 6))
assertEquals(results(14), 7)
assertEquals(results(15), offsets(p, 8))
assertEquals(results(16), 8)
assertEquals(results(17), offsets(p, 8))
assertEquals(results(18), 9)
assertEquals(results(19), offsets(p, 10))
}
}
}
}

// this has a real danger of becoming a "flaky test" due to its timing assumptions
test("processingAndCommitting commits after elapsed time") {
ProducerApi
.resource[IO, Int, Int](
BootstrapServers(bootstrapServer)
)
.use { producer =>
ConsumerApi
.resource[IO, Int, Int](
BootstrapServers(bootstrapServer),
GroupId(genGroupId),
AutoOffsetReset.earliest,
EnableAutoCommit(false),
)
.use { consumer =>
for {
topic <- createTestTopic[IO]()
p = new TopicPartition(topic, 0)
ps = Set(p)
values = (0 to 9).toList
_ <- producer.sendAsyncBatch(
values.map(v => new ProducerRecord(topic, v, v))
)
() <- consumer.subscribe(topic)
c0 <- consumer.partitionQueries.committed(ps)
pac = consumer.processingAndCommitting(
pollTimeout = 100.millis,
maxRecordCount = Long.MaxValue,
maxElapsedTime = 200.millis,
)(r => IO.sleep(101.millis).as(r.value))
committed = Stream.repeatEval(
consumer.partitionQueries.committed(ps)
)
results <- pac
.take(values.size.toLong)
.interleave(committed)
.compile
.toList
} yield {
assertEquals(c0, empty)
assertEquals(results.size, values.size * 2)
// TODO rewrite this to use values, not so hard-coded
assertEquals(results(0), 0)
assertEquals(results(1), empty)
assertEquals(results(2), 1)
assertEquals(results(3), offsets(p, 2))
assertEquals(results(4), 2)
assertEquals(results(5), offsets(p, 2))
assertEquals(results(6), 3)
assertEquals(results(7), offsets(p, 4))
assertEquals(results(8), 4)
assertEquals(results(9), offsets(p, 4))
assertEquals(results(10), 5)
assertEquals(results(11), offsets(p, 6))
assertEquals(results(12), 6)
assertEquals(results(13), offsets(p, 6))
assertEquals(results(14), 7)
assertEquals(results(15), offsets(p, 8))
assertEquals(results(16), 8)
assertEquals(results(17), offsets(p, 8))
assertEquals(results(18), 9)
assertEquals(results(19), offsets(p, 10))
}
}
}
}

case class CommitOnFailureException()
extends RuntimeException("Commit on failure exception")

test("on failure, commits successful offsets, but not the failed offset") {
ProducerApi
.resource[IO, Int, Int](
BootstrapServers(bootstrapServer)
)
.use { producer =>
ConsumerApi
.resource[IO, Int, Int](
BootstrapServers(bootstrapServer),
GroupId(genGroupId),
AutoOffsetReset.earliest,
EnableAutoCommit(false),
)
.use { consumer =>
for {
topic <- createTestTopic[IO]()
p = new TopicPartition(topic, 0)
ps = Set(p)
values = (0 to 9).toList
throwOn = 7
_ <- producer.sendAsyncBatch(
values.map(v => new ProducerRecord(topic, v, v))
)
() <- consumer.subscribe(topic)
c0 <- consumer.partitionQueries.committed(ps)
pac = consumer.processingAndCommitting(
pollTimeout = 100.millis,
maxRecordCount = Long.MaxValue,
maxElapsedTime = Long.MaxValue.nanos,
) { r =>
val v = r.value
if (v == throwOn)
IO.raiseError(CommitOnFailureException())
else
v.pure[IO]
}
results <- pac.compile.toList.attempt
c1 <- consumer.partitionQueries.committed(ps)
} yield {
assertEquals(c0, empty)
assertEquals(results, Left(CommitOnFailureException()))
// on failure, the committed offset should be the one that failed, so processing will resume there next time and try again
assertEquals(c1, offsets(p, throwOn.toLong))
}
}
}
}

}
Loading