-
Notifications
You must be signed in to change notification settings - Fork 18
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Auto offset commits (the right way) #782
Changes from all commits
0f04775
ca3166c
c9571cd
b17039f
9309a2d
253e420
1a15870
0bac095
95cff28
ef2576b
82c88e0
119b896
06f6def
c494858
6d2bc98
dea9175
4c12953
4ea9f79
6c33257
3c211a5
7dd99d9
e73dcd3
a1d907b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -353,4 +353,93 @@ case class ConsumerOps[F[_], K, V](consumer: ConsumerApi[F, K, V]) { | |
.recordStream(pollTimeout) | ||
.evalMap(r => process(r) <* consumer.commitSync(r.nextOffset)) | ||
|
||
/** Returns a stream that processes records using the specified function, | ||
* committing offsets for successfully processed records, either after | ||
* processing the specified number of records, or after the specified time | ||
* has elapsed since the last offset commit. If the processing function | ||
* returns a failure, offsets for records successfully processed before that | ||
* failure will be committed, and then the stream will halt with that | ||
* failure. This is at-least-once processing: after a restart, the record | ||
* that failed will be reprocessed. In some use cases this pattern is more | ||
* appropriate than just using auto-offset-commits, since it will not commit | ||
* offsets for failed records when the consumer is closed. The consumer must | ||
* be configured to disable offset auto-commits. | ||
*/ | ||
def processingAndCommitting[A]( | ||
pollTimeout: FiniteDuration, | ||
maxRecordCount: Long = 1000L, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If someone passes in a negative number, or a negative time would anything unexpected happen? I am not sure how There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point. This probably needs more validation of inputs. |
||
maxElapsedTime: FiniteDuration = 60.seconds, | ||
)( | ||
process: ConsumerRecord[K, V] => F[A] | ||
)(implicit C: Clock[F], S: Concurrent[F]): Stream[F, A] = | ||
for { | ||
state <- Stream.eval( | ||
C.monotonic | ||
.flatMap(now => | ||
Ref.of[F, OffsetCommitState](OffsetCommitState.empty(now)) | ||
) | ||
) | ||
record <- consumer.recordStream(pollTimeout) | ||
a <- Stream.eval( | ||
process(record) | ||
.onError(_ => | ||
// we can still commit offsets that were successfully processed, before this stream halts, consumer is closed, etc | ||
// this is still at-least-once processing, but minimizes the amount of reprocessing after restart | ||
state.get.flatMap(s => consumer.commitSync(s.nextOffsets)) | ||
) | ||
) | ||
s <- Stream.eval(state.updateAndGet(_.update(record))) | ||
zcox marked this conversation as resolved.
Show resolved
Hide resolved
|
||
now <- Stream.eval(C.monotonic) | ||
() <- Stream.eval( | ||
s.needToCommit(maxRecordCount, now, maxElapsedTime) | ||
.traverse_(os => | ||
consumer.commitSync(os) *> | ||
state.update(_.reset(now)) | ||
) | ||
) | ||
} yield a | ||
|
||
case class OffsetCommitState( | ||
offsets: Map[TopicPartition, Long], | ||
recordCount: Long, | ||
lastCommitTime: FiniteDuration, | ||
) { | ||
def update(record: ConsumerRecord[_, _]): OffsetCommitState = | ||
copy( | ||
offsets = offsets + (new TopicPartition( | ||
record.topic, | ||
record.partition, | ||
) -> record.offset), | ||
recordCount = recordCount + 1, | ||
) | ||
|
||
def needToCommit( | ||
maxRecordCount: Long, | ||
now: FiniteDuration, | ||
maxElapsedTime: FiniteDuration, | ||
): Option[Map[TopicPartition, OffsetAndMetadata]] = { | ||
(recordCount >= maxRecordCount || (now - lastCommitTime) >= maxElapsedTime) | ||
.guard[Option] | ||
.as(nextOffsets) | ||
} | ||
|
||
def nextOffsets: Map[TopicPartition, OffsetAndMetadata] = | ||
offsets.view.mapValues(o => new OffsetAndMetadata(o + 1)).toMap | ||
|
||
def reset(time: FiniteDuration): OffsetCommitState = | ||
OffsetCommitState.empty(time) | ||
} | ||
|
||
object OffsetCommitState { | ||
val empty = OffsetCommitState( | ||
offsets = Map.empty, | ||
recordCount = 0L, | ||
lastCommitTime = 0.millis, | ||
) | ||
def empty(time: FiniteDuration): OffsetCommitState = | ||
empty.copy( | ||
lastCommitTime = time | ||
) | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
/* | ||
* Copyright 2019 Jack Henry & Associates, Inc.® | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package com.banno.kafka | ||
|
||
import cats.syntax.all.* | ||
import cats.effect.Sync | ||
import org.scalacheck.Gen | ||
import org.apache.kafka.clients.admin.NewTopic | ||
import com.banno.kafka.admin.AdminApi | ||
|
||
trait KafkaSpec { | ||
|
||
val bootstrapServer = "localhost:9092" | ||
val schemaRegistryUrl = "http://localhost:8091" | ||
|
||
def randomId: String = | ||
Gen.listOfN(10, Gen.alphaChar).map(_.mkString).sample.get | ||
def genGroupId: String = randomId | ||
def genTopic: String = randomId | ||
|
||
def createTestTopic[F[_]: Sync](partitionCount: Int = 1): F[String] = { | ||
val topicName = genTopic | ||
AdminApi | ||
.createTopicsIdempotent[F]( | ||
bootstrapServer, | ||
List(new NewTopic(topicName, partitionCount, 1.toShort)), | ||
) | ||
.as(topicName) | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,225 @@ | ||
/* | ||
* Copyright 2019 Jack Henry & Associates, Inc.® | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package com.banno.kafka | ||
|
||
import cats.syntax.all.* | ||
import cats.effect.IO | ||
import fs2.Stream | ||
import munit.CatsEffectSuite | ||
import org.apache.kafka.common.TopicPartition | ||
import org.apache.kafka.clients.producer.ProducerRecord | ||
import org.apache.kafka.clients.consumer.OffsetAndMetadata | ||
import com.banno.kafka.producer.* | ||
import com.banno.kafka.consumer.* | ||
import scala.concurrent.duration.* | ||
import natchez.Trace.Implicits.noop | ||
|
||
class ProcessingAndCommittingSpec extends CatsEffectSuite with KafkaSpec { | ||
|
||
def offsets( | ||
p: TopicPartition, | ||
o: Long, | ||
): Map[TopicPartition, OffsetAndMetadata] = | ||
Map(p -> new OffsetAndMetadata(o)) | ||
|
||
val empty = Map.empty[TopicPartition, OffsetAndMetadata] | ||
|
||
test("processingAndCommitting commits after number of records") { | ||
ProducerApi | ||
.resource[IO, Int, Int]( | ||
BootstrapServers(bootstrapServer) | ||
) | ||
.use { producer => | ||
ConsumerApi | ||
.resource[IO, Int, Int]( | ||
BootstrapServers(bootstrapServer), | ||
GroupId(genGroupId), | ||
AutoOffsetReset.earliest, | ||
EnableAutoCommit(false), | ||
) | ||
.use { consumer => | ||
for { | ||
topic <- createTestTopic[IO]() | ||
p = new TopicPartition(topic, 0) | ||
ps = Set(p) | ||
values = (0 to 9).toList | ||
_ <- producer.sendAsyncBatch( | ||
values.map(v => new ProducerRecord(topic, v, v)) | ||
) | ||
() <- consumer.subscribe(topic) | ||
c0 <- consumer.partitionQueries.committed(ps) | ||
pac = consumer.processingAndCommitting( | ||
pollTimeout = 100.millis, | ||
maxRecordCount = 2, | ||
maxElapsedTime = Long.MaxValue.nanos, | ||
)(_.value.pure[IO]) | ||
committed = Stream.repeatEval( | ||
consumer.partitionQueries.committed(ps) | ||
) | ||
results <- pac | ||
.take(values.size.toLong) | ||
.interleave(committed) | ||
.compile | ||
.toList | ||
} yield { | ||
assertEquals(c0, empty) | ||
assertEquals(results.size, values.size * 2) | ||
// TODO rewrite this to use values, not so hard-coded | ||
assertEquals(results(0), 0) | ||
assertEquals(results(1), empty) | ||
assertEquals(results(2), 1) | ||
assertEquals(results(3), offsets(p, 2)) | ||
assertEquals(results(4), 2) | ||
assertEquals(results(5), offsets(p, 2)) | ||
assertEquals(results(6), 3) | ||
assertEquals(results(7), offsets(p, 4)) | ||
assertEquals(results(8), 4) | ||
assertEquals(results(9), offsets(p, 4)) | ||
assertEquals(results(10), 5) | ||
assertEquals(results(11), offsets(p, 6)) | ||
assertEquals(results(12), 6) | ||
assertEquals(results(13), offsets(p, 6)) | ||
assertEquals(results(14), 7) | ||
assertEquals(results(15), offsets(p, 8)) | ||
assertEquals(results(16), 8) | ||
assertEquals(results(17), offsets(p, 8)) | ||
assertEquals(results(18), 9) | ||
assertEquals(results(19), offsets(p, 10)) | ||
} | ||
} | ||
} | ||
} | ||
|
||
// this has a real danger of becoming a "flaky test" due to its timing assumptions | ||
test("processingAndCommitting commits after elapsed time") { | ||
ProducerApi | ||
.resource[IO, Int, Int]( | ||
BootstrapServers(bootstrapServer) | ||
) | ||
.use { producer => | ||
ConsumerApi | ||
.resource[IO, Int, Int]( | ||
BootstrapServers(bootstrapServer), | ||
GroupId(genGroupId), | ||
AutoOffsetReset.earliest, | ||
EnableAutoCommit(false), | ||
) | ||
.use { consumer => | ||
for { | ||
topic <- createTestTopic[IO]() | ||
p = new TopicPartition(topic, 0) | ||
ps = Set(p) | ||
values = (0 to 9).toList | ||
_ <- producer.sendAsyncBatch( | ||
values.map(v => new ProducerRecord(topic, v, v)) | ||
) | ||
() <- consumer.subscribe(topic) | ||
c0 <- consumer.partitionQueries.committed(ps) | ||
pac = consumer.processingAndCommitting( | ||
pollTimeout = 100.millis, | ||
maxRecordCount = Long.MaxValue, | ||
maxElapsedTime = 200.millis, | ||
)(r => IO.sleep(101.millis).as(r.value)) | ||
committed = Stream.repeatEval( | ||
consumer.partitionQueries.committed(ps) | ||
) | ||
results <- pac | ||
.take(values.size.toLong) | ||
.interleave(committed) | ||
.compile | ||
.toList | ||
} yield { | ||
assertEquals(c0, empty) | ||
assertEquals(results.size, values.size * 2) | ||
// TODO rewrite this to use values, not so hard-coded | ||
assertEquals(results(0), 0) | ||
assertEquals(results(1), empty) | ||
assertEquals(results(2), 1) | ||
assertEquals(results(3), offsets(p, 2)) | ||
assertEquals(results(4), 2) | ||
assertEquals(results(5), offsets(p, 2)) | ||
assertEquals(results(6), 3) | ||
assertEquals(results(7), offsets(p, 4)) | ||
assertEquals(results(8), 4) | ||
assertEquals(results(9), offsets(p, 4)) | ||
assertEquals(results(10), 5) | ||
assertEquals(results(11), offsets(p, 6)) | ||
assertEquals(results(12), 6) | ||
assertEquals(results(13), offsets(p, 6)) | ||
assertEquals(results(14), 7) | ||
assertEquals(results(15), offsets(p, 8)) | ||
assertEquals(results(16), 8) | ||
assertEquals(results(17), offsets(p, 8)) | ||
assertEquals(results(18), 9) | ||
assertEquals(results(19), offsets(p, 10)) | ||
} | ||
} | ||
} | ||
} | ||
|
||
case class CommitOnFailureException() | ||
extends RuntimeException("Commit on failure exception") | ||
|
||
test("on failure, commits successful offsets, but not the failed offset") { | ||
ProducerApi | ||
.resource[IO, Int, Int]( | ||
BootstrapServers(bootstrapServer) | ||
) | ||
.use { producer => | ||
ConsumerApi | ||
.resource[IO, Int, Int]( | ||
BootstrapServers(bootstrapServer), | ||
GroupId(genGroupId), | ||
AutoOffsetReset.earliest, | ||
EnableAutoCommit(false), | ||
) | ||
.use { consumer => | ||
for { | ||
topic <- createTestTopic[IO]() | ||
p = new TopicPartition(topic, 0) | ||
ps = Set(p) | ||
values = (0 to 9).toList | ||
throwOn = 7 | ||
_ <- producer.sendAsyncBatch( | ||
values.map(v => new ProducerRecord(topic, v, v)) | ||
) | ||
() <- consumer.subscribe(topic) | ||
c0 <- consumer.partitionQueries.committed(ps) | ||
pac = consumer.processingAndCommitting( | ||
pollTimeout = 100.millis, | ||
maxRecordCount = Long.MaxValue, | ||
maxElapsedTime = Long.MaxValue.nanos, | ||
) { r => | ||
val v = r.value | ||
if (v == throwOn) | ||
IO.raiseError(CommitOnFailureException()) | ||
else | ||
v.pure[IO] | ||
} | ||
results <- pac.compile.toList.attempt | ||
c1 <- consumer.partitionQueries.committed(ps) | ||
} yield { | ||
assertEquals(c0, empty) | ||
assertEquals(results, Left(CommitOnFailureException())) | ||
// on failure, the committed offset should be the one that failed, so processing will resume there next time and try again | ||
assertEquals(c1, offsets(p, throwOn.toLong)) | ||
} | ||
} | ||
} | ||
} | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does
Stream.groupWithin
help here? It emits chunks on the maxRecordCount-and-maxElapsedTime cadence.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TIL
Stream.groupWithin
. Reading its docs, IIUC it seems that it will buffer up elements of the input stream, until it obtains a certain number or timeout exceeded. For a Kafka consumer, I don't think we'd want that behavior, since it would introduce delay.We want the elements to flow from the input stream as they're read from Kafka, and take an action (i.e. commit offsets) after a number of records or a timeout.
Stream.groupWithin
is close to that, but if it delays records going to theprocess
function we wouldn't want that.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not if it came after L390, but I would need to think about the
onError
on line 385. Don't let this block it, but I may play type tetris later.