Skip to content

Commit

Permalink
Merge pull request #548 from Banno/auto-offset-reset-none
Browse files Browse the repository at this point in the history
feat: default to `AutoOffsetReset.none` in `RecordStream`
  • Loading branch information
Kazark authored Dec 21, 2021
2 parents 53ee981 + a7d8aac commit 67b1e87
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 21 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ lazy val commonSettings = Seq(
),
Test / sourceGenerators += (Test / avroScalaGenerate).taskValue,
watchSources ++= ((Test / avroSourceDirectories).value ** "*.avdl").get,
testOptions in Test += Tests.Argument(TestFrameworks.ScalaTest, "-oS"),
Test / testOptions += Tests.Argument(TestFrameworks.ScalaTest, "-oS"),
)

lazy val contributors = Seq(
Expand Down
34 changes: 14 additions & 20 deletions core/src/main/scala/com/banno/kafka/RecordStream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,8 @@ object RecordStream {
}

sealed trait ConfigStage2[P[_[_], _]] {
def autoOffsetResetLatest: ConfigStage2[P]
def autoOffsetResetEarliest: ConfigStage2[P]
def untypedExtras(configs: Seq[(String, AnyRef)]): ConfigStage2[P]

def batched: ConfigStage3[P, IncomingRecords]
Expand All @@ -357,25 +359,11 @@ object RecordStream {
clientId: String,
whetherCommits: WhetherCommits[P],
extraConfigs: Seq[(String, AnyRef)] = Seq.empty,
reset: AutoOffsetReset = AutoOffsetReset.none,
) extends ConfigStage2[P] {
def consumerApiV2[F[_]: Async]: Resource[F, ConsumerApi[F, GenericRecord, GenericRecord]] = {
val configs: List[(String, AnyRef)] =
whetherCommits.configs ++
extraConfigs ++
List(
kafkaBootstrapServers,
schemaRegistryUri,
EnableAutoCommit(false),
IsolationLevel.ReadCommitted,
ClientId(clientId),
MetricReporters[ConsumerPrometheusReporter],
)
ConsumerApi.Avro.Generic.resource[F](configs: _*)
}

def consumerApi[F[_]: Async](
reset: AutoOffsetReset
): Resource[F, ConsumerApi[F, GenericRecord, GenericRecord]] = {
def consumerApi[
F[_]: Async
]: Resource[F, ConsumerApi[F, GenericRecord, GenericRecord]] = {
val configs: List[(String, AnyRef)] =
whetherCommits.configs ++
extraConfigs ++
Expand All @@ -391,6 +379,12 @@ object RecordStream {
ConsumerApi.Avro.Generic.resource[F](configs: _*)
}

override def autoOffsetResetLatest: ConfigStage2[P] =
copy(reset = AutoOffsetReset.latest)

override def autoOffsetResetEarliest: ConfigStage2[P] =
copy(reset = AutoOffsetReset.earliest)

override def untypedExtras(configs: Seq[(String, AnyRef)]) =
copy(extraConfigs = configs)

Expand Down Expand Up @@ -528,7 +522,7 @@ object RecordStream {
topical: Topical[A, B],
reset: AutoOffsetReset,
): Resource[F, RecordStream[F, IncomingRecords[A]]] =
baseConfigs.consumerApi(reset).evalMap { consumer =>
baseConfigs.copy(reset=reset).consumerApi.evalMap { consumer =>
consumer
.subscribe(topical.names.map(_.show).toList)
.as(recordStream(consumer, topical))
Expand Down Expand Up @@ -575,7 +569,7 @@ object RecordStream {
streamSelectorViaConsumer(baseConfigs.whetherCommits, topical).mapK(
new ~>[NeedsConsumer[F, *], Resource[F, *]] {
override def apply[A](fa: NeedsConsumer[F, A]): Resource[F, A] =
baseConfigs.consumerApiV2[F].evalMap { consumer =>
baseConfigs.consumerApi[F].evalMap { consumer =>
assign(consumer, topical, seekToF)
.as(fa(consumer))
}
Expand Down

0 comments on commit 67b1e87

Please sign in to comment.