Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[POC - 3] New commit interface #1042

Draft
wants to merge 14 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ jobs:
continue-on-error: true
steps:
- name: Git Checkout
uses: actions/checkout@v3.6.0
uses: actions/checkout@v4.0.0
with:
fetch-depth: '0'
- name: Install libuv
Expand All @@ -48,7 +48,7 @@ jobs:
continue-on-error: false
steps:
- name: Git Checkout
uses: actions/checkout@v3.6.0
uses: actions/checkout@v4.0.0
with:
fetch-depth: '0'
- name: Install libuv
Expand Down Expand Up @@ -88,7 +88,7 @@ jobs:
- name: Cache Dependencies
uses: coursier/cache-action@v6
- name: Git Checkout
uses: actions/checkout@v3.6.0
uses: actions/checkout@v4.0.0
with:
fetch-depth: '0'
- name: Test
Expand All @@ -100,7 +100,7 @@ jobs:
if: ${{ github.event_name == 'push' }}
steps:
- name: Git Checkout
uses: actions/checkout@v3.6.0
uses: actions/checkout@v4.0.0
with:
fetch-depth: '0'
- name: Install libuv
Expand Down Expand Up @@ -129,7 +129,7 @@ jobs:
app_private_key: ${{ secrets.APP_PRIVATE_KEY }}
- name: Create Pull Request
id: cpr
uses: peter-evans/[email protected].0
uses: peter-evans/[email protected].2
with:
body: |-
Autogenerated changes after running the `sbt docs/generateReadme` command of the [zio-sbt-website](https://zio.dev/zio-sbt) plugin.
Expand Down Expand Up @@ -174,7 +174,7 @@ jobs:
if: ${{ github.event_name != 'pull_request' }}
steps:
- name: Git Checkout
uses: actions/checkout@v3.6.0
uses: actions/checkout@v4.0.0
with:
fetch-depth: '0'
- name: Install libuv
Expand Down Expand Up @@ -203,7 +203,7 @@ jobs:
if: ${{ ((github.event_name == 'release') && (github.event.action == 'published')) || (github.event_name == 'workflow_dispatch') }}
steps:
- name: Git Checkout
uses: actions/checkout@v3.6.0
uses: actions/checkout@v4.0.0
with:
fetch-depth: '0'
- name: Install libuv
Expand Down Expand Up @@ -234,7 +234,7 @@ jobs:
if: ${{ (github.event_name == 'release') && (github.event.action == 'published') }}
steps:
- name: Git Checkout
uses: actions/checkout@v3.6.0
uses: actions/checkout@v4.0.0
with:
fetch-depth: '0'
- name: notify the main repo about the new release of docs package
Expand Down
6 changes: 4 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ lazy val kafkaClients = "org.apache.kafka" % "kafka-clients"
lazy val scalaCollectionCompat = "org.scala-lang.modules" %% "scala-collection-compat" % "2.11.0"
lazy val jacksonDatabind = "com.fasterxml.jackson.core" % "jackson-databind" % "2.15.2"
lazy val logback = "ch.qos.logback" % "logback-classic" % "1.3.11"
lazy val zioPrelude = "dev.zio" %% "zio-prelude" % "1.0.0-RC20"

enablePlugins(ZioSbtEcosystemPlugin, ZioSbtCiPlugin)

Expand Down Expand Up @@ -77,8 +78,8 @@ lazy val root = project
def stdSettings(prjName: String) = Seq(
name := s"$prjName",
scalafmtOnCompile := !insideCI.value,
Compile / compile / scalacOptions ++=
optionsOn("2.13")("-Wconf:cat=unused-nowarn:s").value,
scalacOptions ++= optionsOn("2.13")("-Wconf:cat=unused-nowarn:s").value,
scalacOptions ++= optionsOn("2.12")("-Xfuture", "-Xsource:2.13").value,
scalacOptions -= "-Xlint:infer-any",
// workaround for bad constant pool issue
(Compile / doc) := Def.taskDyn {
Expand All @@ -102,6 +103,7 @@ lazy val zioKafka =
.settings(enableZIO(enableStreaming = true))
.settings(
libraryDependencies ++= Seq(
zioPrelude,
kafkaClients,
jacksonDatabind,
scalaCollectionCompat
Expand Down
2 changes: 1 addition & 1 deletion project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
val zioSbtVersion = "0.4.0-alpha.17"
val zioSbtVersion = "0.4.0-alpha.18"

addSbtPlugin("dev.zio" % "zio-sbt-ecosystem" % zioSbtVersion)
addSbtPlugin("dev.zio" % "zio-sbt-website" % zioSbtVersion)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,12 @@ import org.apache.kafka.clients.consumer.ConsumerConfig
import org.openjdk.jmh.annotations._
import zio.kafka.bench.ZioBenchmark.randomThing
import zio.kafka.consumer.diagnostics.Diagnostics
import zio.kafka.consumer.{ Consumer, Offset, OffsetBatch, Subscription }
import zio.kafka.consumer.{ Consumer, Subscription }
import zio.kafka.producer.Producer
import zio.kafka.serde.Serde
import zio.kafka.testkit.Kafka
import zio.kafka.testkit.KafkaTestUtils.{ consumerSettings, produceMany, producer }
import zio.stream.ZSink
import zio.{ durationInt, Ref, Schedule, ZIO, ZLayer }
import zio.{ Ref, ZIO, ZLayer }

import java.util.concurrent.TimeUnit

Expand Down Expand Up @@ -60,18 +59,15 @@ class ConsumerBenchmark extends ZioBenchmark[Kafka with Producer] {
def throughputWithCommits(): Any = runZIO {
for {
counter <- Ref.make(0)
_ <- ZIO.logAnnotate("consumer", "1") {
Consumer
.plainStream(Subscription.topics(topic1), Serde.byteArray, Serde.byteArray)
.map(_.offset)
.aggregateAsyncWithin(ZSink.collectAll[Offset], Schedule.fixed(100.millis))
.tap(batch => counter.update(_ + batch.size))
.map(OffsetBatch.apply)
.mapZIO(_.commit)
.takeUntilZIO(_ => counter.get.map(_ >= nrMessages))
.runDrain
.provideSome[Kafka](env)
}
_ <- ZIO
.logAnnotate("consumer", "1") {
Consumer
.plainStream(Subscription.topics(topic1), Serde.byteArray, Serde.byteArray)
.mapChunksZIO(records => counter.update(_ + records.size) *> Consumer.commit(records).as(records))
.takeUntilZIO(_ => counter.get.map(_ >= nrMessages))
.runDrain
.provideSome[Kafka](env)
}
} yield ()
}
}
17 changes: 6 additions & 11 deletions zio-kafka-test/src/test/scala/zio/kafka/AdminSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import zio.kafka.admin.AdminClient.{
}
import zio.kafka.admin.acl._
import zio.kafka.admin.resource.{ PatternType, ResourcePattern, ResourcePatternFilter, ResourceType }
import zio.kafka.consumer.{ CommittableRecord, Consumer, OffsetBatch, Subscription }
import zio.kafka.consumer.{ Consumer, Subscription }
import zio.kafka.serde.Serde
import zio.kafka.testkit.KafkaTestUtils._
import zio.kafka.testkit._
Expand Down Expand Up @@ -228,13 +228,8 @@ object AdminSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
.partitionedStream[Kafka, String, String](Subscription.Topics(Set(topic)), Serde.string, Serde.string)
.flatMapPar(partitionCount)(_._2)
.take(count)
.transduce(ZSink.collectAllN[CommittableRecord[String, String]](20))
.mapConcatZIO { committableRecords =>
val records = committableRecords.map(_.record)
val offsetBatch = OffsetBatch(committableRecords.map(_.offset))

offsetBatch.commit.as(records)
}
.transduce(ZSink.collectAllN[ConsumerRecord[String, String]](20))
.mapConcatZIO(records => Consumer.commit(records).as(records))
.runCollect
.provideSomeLayer[Kafka](consumer("adminspec-topic10", Some(consumerGroupID)))

Expand Down Expand Up @@ -301,7 +296,7 @@ object AdminSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
Consumer
.plainStream[Kafka, String, String](Subscription.Topics(Set(topic)), Serde.string, Serde.string)
.take(count)
.foreach(_.offset.commit)
.foreach(Consumer.commit)
.provideSomeLayer[Kafka](consumer(topic, Some(groupId)))

KafkaTestUtils.withAdmin { client =>
Expand Down Expand Up @@ -344,7 +339,7 @@ object AdminSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
Consumer
.plainStream[Kafka, String, String](Subscription.Topics(Set(topic)), Serde.string, Serde.string)
.take(count)
.foreach(_.offset.commit)
.foreach(Consumer.commit)
.provideSomeLayer[Kafka](consumer(topic, Some(groupId)))

KafkaTestUtils.withAdmin { client =>
Expand Down Expand Up @@ -645,7 +640,7 @@ object AdminSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
groupInstanceId: Option[String] = None
): ZIO[Kafka, Throwable, Unit] = Consumer
.plainStream(Subscription.topics(topicName), Serde.string, Serde.string)
.foreach(_.offset.commit)
.foreach(Consumer.commit)
.provideSomeLayer(consumer(clientId, Some(groupId), groupInstanceId))

private def getStableConsumerGroupDescription(
Expand Down
23 changes: 12 additions & 11 deletions zio-kafka-test/src/test/scala/zio/kafka/ProducerSpec.scala
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package zio.kafka

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.config.TopicConfig
import zio._
import zio.kafka.admin.AdminClient.NewTopic
import zio.kafka.consumer._
import zio.kafka.consumer.types.{ Offset, OffsetBatch }
import zio.kafka.producer.TransactionalProducer.{ TransactionLeaked, UserInitiatedAbort }
import zio.kafka.producer.{ ByteRecord, Producer, Transaction, TransactionalProducer }
import zio.kafka.serde.Serde
Expand All @@ -26,7 +28,7 @@ object ProducerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
def withConsumerInt(
subscription: Subscription,
settings: ConsumerSettings
): ZIO[Any with Scope, Throwable, Dequeue[Take[Throwable, CommittableRecord[String, Int]]]] =
): ZIO[Any with Scope, Throwable, Dequeue[Take[Throwable, ConsumerRecord[String, Int]]]] =
Consumer.make(settings).flatMap { c =>
c.plainStream(subscription, Serde.string, Serde.int).toQueue()
}
Expand Down Expand Up @@ -192,16 +194,15 @@ object ProducerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
withConsumer(Topics(Set(topic1)), settings).flatMap { consumer =>
for {
messages <- consumer.take.flatMap(_.done).mapError(_.getOrElse(new NoSuchElementException))
record = messages
.filter(rec => rec.record.key == key1 && rec.record.value == value1)
record = messages.filter(rec => rec.key == key1 && rec.value == value1)
} yield record
}
}
record2 <- ZIO.scoped {
withConsumer(Topics(Set(topic2)), settings).flatMap { consumer =>
for {
messages <- consumer.take.flatMap(_.done).mapError(_.getOrElse(new NoSuchElementException))
record = messages.filter(rec => rec.record.key == key2 && rec.record.value == value2)
record = messages.filter(rec => rec.key == key2 && rec.value == value2)
} yield record
}
}
Expand Down Expand Up @@ -289,7 +290,7 @@ object ProducerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
messages <- consumer.take
.flatMap(_.done)
.mapError(_.getOrElse(new NoSuchElementException))
record = messages.filter(rec => rec.record.key == "bob")
record = messages.filter(_.key == "bob")
} yield record
}
}
Expand Down Expand Up @@ -329,7 +330,7 @@ object ProducerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
messages <- consumer.take
.flatMap(_.done)
.mapError(_.getOrElse(new NoSuchElementException))
record = messages.filter(rec => rec.record.key == "bob")
record = messages.filter(_.key == "bob")
} yield record
}
}
Expand Down Expand Up @@ -417,7 +418,7 @@ object ProducerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
messages <- consumer.take
.flatMap(_.done)
.mapError(_.getOrElse(new NoSuchElementException))
record = messages.filter(rec => rec.record.key == "no one")
record = messages.filter(_.key == "no one")
} yield record

}
Expand Down Expand Up @@ -458,7 +459,7 @@ object ProducerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
messages <- consumer.take
.flatMap(_.done)
.mapError(_.getOrElse(new NoSuchElementException))
record = messages.filter(rec => rec.record.key == "no one")
record = messages.filter(_.key == "no one")
} yield record
}
}
Expand Down Expand Up @@ -504,7 +505,7 @@ object ProducerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
messages <- consumer.take
.flatMap(_.done)
.mapError(_.getOrElse(new NoSuchElementException))
record = messages.filter(rec => rec.record.key == "no one")
record = messages.filter(_.key == "no one")
} yield record
}
}
Expand Down Expand Up @@ -544,7 +545,7 @@ object ProducerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
aliceAccountFeesPaid,
Serde.string,
Serde.int,
Some(aliceHadMoneyCommittableMessage.offset)
Some(Offset.from(aliceHadMoneyCommittableMessage))
)
}
}
Expand Down Expand Up @@ -591,7 +592,7 @@ object ProducerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
aliceAccountFeesPaid,
Serde.string,
Serde.int,
Some(aliceHadMoneyCommittableMessage.offset)
Some(Offset.from(aliceHadMoneyCommittableMessage))
) *>
t.abort
}
Expand Down
Loading
Loading