Skip to content

Commit

Permalink
Merge pull request #845 from Banno/backport
Browse files Browse the repository at this point in the history
Back-port `processingAndCommitting`
  • Loading branch information
amohrland authored Jan 16, 2024
2 parents 81c2767 + 6856517 commit 8b26776
Show file tree
Hide file tree
Showing 3 changed files with 413 additions and 0 deletions.
91 changes: 91 additions & 0 deletions core/src/main/scala/com/banno/kafka/consumer/ConsumerOps.scala
Original file line number Diff line number Diff line change
Expand Up @@ -353,4 +353,95 @@ case class ConsumerOps[F[_], K, V](consumer: ConsumerApi[F, K, V]) {
.recordStream(pollTimeout)
.evalMap(r => process(r) <* consumer.commitSync(r.nextOffset))

/** Returns a stream that processes records using the specified function,
* committing offsets for successfully processed records, either after
* processing the specified number of records, or after the specified time
* has elapsed since the last offset commit. On any stream finalization,
* whether success, error, or cancelation, offsets will be committed. This is
* at-least-once processing: after a restart, the record that failed will be
* reprocessed. In some use cases this pattern is more appropriate than just
* using auto-offset-commits, since it will not commit offsets for failed
* records when the consumer is closed. The consumer must be configured to
* disable offset auto-commits, i.e. `enable.auto.commit=false`.
*/
def processingAndCommitting[A](
pollTimeout: FiniteDuration,
maxRecordCount: Long = 1000L,
maxElapsedTime: FiniteDuration = 60.seconds,
)(
process: ConsumerRecord[K, V] => F[A]
)(implicit C: Clock[F], S: Concurrent[F]): Stream[F, A] =
Stream
.eval(
C.monotonic
.flatMap(now =>
Ref.of[F, OffsetCommitState](OffsetCommitState.empty(now))
)
)
.flatMap { state =>
def commit(offsets: Map[TopicPartition, OffsetAndMetadata]): F[Unit] =
consumer.commitSync(offsets)
val commitNextOffsets: F[Unit] =
state.get.map(_.nextOffsets).flatMap(commit)
consumer
.recordStream(pollTimeout)
.evalMap { record =>
for {
a <- process(record)
s <- state.updateAndGet(_.update(record))
now <- C.monotonic
() <- s
.needToCommit(maxRecordCount, now, maxElapsedTime)
.traverse_(os =>
commit(os) *>
state.update(_.reset(now))
)
} yield a
}
.onFinalize(commitNextOffsets)
}

case class OffsetCommitState(
offsets: Map[TopicPartition, Long],
recordCount: Long,
lastCommitTime: FiniteDuration,
) {
def update(record: ConsumerRecord[_, _]): OffsetCommitState =
copy(
offsets = offsets + (new TopicPartition(
record.topic,
record.partition,
) -> record.offset),
recordCount = recordCount + 1,
)

def needToCommit(
maxRecordCount: Long,
now: FiniteDuration,
maxElapsedTime: FiniteDuration,
): Option[Map[TopicPartition, OffsetAndMetadata]] = {
(recordCount >= maxRecordCount || (now - lastCommitTime) >= maxElapsedTime)
.guard[Option]
.as(nextOffsets)
}

def nextOffsets: Map[TopicPartition, OffsetAndMetadata] =
offsets.view.mapValues(o => new OffsetAndMetadata(o + 1)).toMap

def reset(time: FiniteDuration): OffsetCommitState =
OffsetCommitState.empty(time)
}

object OffsetCommitState {
val empty = OffsetCommitState(
offsets = Map.empty,
recordCount = 0L,
lastCommitTime = 0.millis,
)
def empty(time: FiniteDuration): OffsetCommitState =
empty.copy(
lastCommitTime = time
)
}

}
45 changes: 45 additions & 0 deletions core/src/test/scala/com/banno/kafka/KafkaSpec.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright 2019 Jack Henry & Associates, Inc.®
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.banno.kafka

import cats.syntax.all.*
import cats.effect.Sync
import org.scalacheck.Gen
import org.apache.kafka.clients.admin.NewTopic
import com.banno.kafka.admin.AdminApi

trait KafkaSpec {

val bootstrapServer = "localhost:9092"
val schemaRegistryUrl = "http://localhost:8091"

def randomId: String =
Gen.listOfN(10, Gen.alphaChar).map(_.mkString).sample.get
def genGroupId: String = randomId
def genTopic: String = randomId

def createTestTopic[F[_]: Sync](partitionCount: Int = 1): F[String] = {
val topicName = genTopic
AdminApi
.createTopicsIdempotent[F](
bootstrapServer,
List(new NewTopic(topicName, partitionCount, 1.toShort)),
)
.as(topicName)
}

}
Loading

0 comments on commit 8b26776

Please sign in to comment.