Skip to content

Commit

Permalink
Merge branch 'series/2.x' into proposal-extenalize-schema-registry-cl…
Browse files Browse the repository at this point in the history
…ient
  • Loading branch information
geirolz authored Apr 7, 2022
2 parents 8e63d55 + 64eaa1d commit 79ccb79
Show file tree
Hide file tree
Showing 40 changed files with 1,339 additions and 831 deletions.
36 changes: 27 additions & 9 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,28 @@ jobs:
strategy:
matrix:
os: [ubuntu-latest]
scala: [2.12.15, 2.13.8, 3.1.0]
java: [[email protected]]
scala: [2.12.15, 2.13.8, 3.1.1]
java: [temurin@8, temurin@17]
runs-on: ${{ matrix.os }}
steps:
- name: Checkout current branch (full)
uses: actions/checkout@v2
with:
fetch-depth: 0

- name: Setup Java and Scala
uses: olafurpg/setup-scala@v13
- name: Setup Java (temurin@8)
if: matrix.java == 'temurin@8'
uses: actions/setup-java@v2
with:
java-version: ${{ matrix.java }}
distribution: temurin
java-version: 8

- name: Setup Java (temurin@17)
if: matrix.java == 'temurin@17'
uses: actions/setup-java@v2
with:
distribution: temurin
java-version: 17

- name: Cache sbt
uses: actions/cache@v2
Expand Down Expand Up @@ -65,18 +74,27 @@ jobs:
matrix:
os: [ubuntu-latest]
scala: [2.13.8]
java: [adopt@1.8]
java: [temurin@8]
runs-on: ${{ matrix.os }}
steps:
- name: Checkout current branch (full)
uses: actions/checkout@v2
with:
fetch-depth: 0

- name: Setup Java and Scala
uses: olafurpg/setup-scala@v13
- name: Setup Java (temurin@8)
if: matrix.java == 'temurin@8'
uses: actions/setup-java@v2
with:
distribution: temurin
java-version: 8

- name: Setup Java (temurin@17)
if: matrix.java == 'temurin@17'
uses: actions/setup-java@v2
with:
java-version: ${{ matrix.java }}
distribution: temurin
java-version: 17

- name: Cache sbt
uses: actions/cache@v2
Expand Down
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,5 @@
target/
.metals/
.vscode/
.bloop/
metals.sbt
11 changes: 11 additions & 0 deletions .scala-steward.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
pullRequests.frequency = "14 days"

updates.pin = [{
groupId = "org.typelevel",
artifactId="cats-effect",
version = "2."
}, {
groupId = "co.fs2",
artifactId="fs2-core",
version = "2."
}]
39 changes: 26 additions & 13 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,24 +1,22 @@
val catsEffectVersion = "3.3.4"

val catsVersion = "2.6.1"
val catsEffectVersion = "3.3.9"

val confluentVersion = "6.2.2"

val fs2Version = "3.2.4"
val fs2Version = "3.2.5"

val kafkaVersion = "2.8.1"

val testcontainersScalaVersion = "0.39.12"
val testcontainersScalaVersion = "0.40.4"

val vulcanVersion = "1.7.1"
val vulcanVersion = "1.8.0"

val munitVersion = "0.7.29"

val scala212 = "2.12.15"

val scala213 = "2.13.8"

val scala3 = "3.1.0"
val scala3 = "3.1.1"

lazy val `fs2-kafka` = project
.in(file("."))
Expand Down Expand Up @@ -118,14 +116,12 @@ lazy val docs = project
lazy val dependencySettings = Seq(
resolvers += "confluent" at "https://packages.confluent.io/maven/",
libraryDependencies ++= Seq(
("com.dimafeng" %% "testcontainers-scala-scalatest" % testcontainersScalaVersion)
.cross(CrossVersion.for3Use2_13),
("com.dimafeng" %% "testcontainers-scala-kafka" % testcontainersScalaVersion)
.cross(CrossVersion.for3Use2_13),
"com.dimafeng" %% "testcontainers-scala-scalatest" % testcontainersScalaVersion,
"com.dimafeng" %% "testcontainers-scala-kafka" % testcontainersScalaVersion,
"org.typelevel" %% "discipline-scalatest" % "2.1.5",
"org.typelevel" %% "cats-effect-laws" % catsEffectVersion,
"org.typelevel" %% "cats-effect-testkit" % catsEffectVersion,
"ch.qos.logback" % "logback-classic" % "1.2.10"
"ch.qos.logback" % "logback-classic" % "1.2.11"
).map(_ % Test),
libraryDependencies ++= {
if (scalaVersion.value.startsWith("3")) Nil
Expand Down Expand Up @@ -228,6 +224,8 @@ ThisBuild / githubWorkflowBuild := Seq(

ThisBuild / githubWorkflowArtifactUpload := false

ThisBuild / githubWorkflowJavaVersions := Seq(JavaSpec.temurin("8"), JavaSpec.temurin("17"))

ThisBuild / githubWorkflowTargetTags ++= Seq("v*")
ThisBuild / githubWorkflowPublishTargetBranches :=
Seq(RefPredicate.StartsWith(Ref.Tag("v")))
Expand Down Expand Up @@ -299,7 +297,22 @@ lazy val mimaSettings = Seq(
ProblemFilters.exclude[DirectMissingMethodProblem]("fs2.kafka.vulcan.AvroSettings#AvroSettingsImpl.copy"),
ProblemFilters.exclude[DirectMissingMethodProblem]("fs2.kafka.vulcan.AvroSettings#AvroSettingsImpl.this"),
ProblemFilters.exclude[DirectMissingMethodProblem]("fs2.kafka.vulcan.AvroSettings#AvroSettingsImpl.apply"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("fs2.kafka.KafkaAdminClient.deleteConsumerGroups")
ProblemFilters.exclude[ReversedMissingMethodProblem]("fs2.kafka.KafkaAdminClient.deleteConsumerGroups"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("fs2.kafka.KafkaProducerConnection.produce"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("fs2.kafka.KafkaProducerConnection.metrics"),
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("fs2.kafka.KafkaConsumer.committed"),

// package-private
ProblemFilters.exclude[DirectMissingMethodProblem]("fs2.kafka.KafkaProducer.from"),

// sealed
ProblemFilters.exclude[ReversedMissingMethodProblem]("fs2.kafka.ConsumerSettings.withDeserializers"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("fs2.kafka.ProducerSettings.withSerializers"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("fs2.kafka.vulcan.AvroSettings.*"),
ProblemFilters.exclude[FinalMethodProblem]("fs2.kafka.vulcan.AvroSettings.*"),

// private
ProblemFilters.exclude[Problem]("fs2.kafka.vulcan.AvroSettings#AvroSettingsImpl.*")
)
// format: on
}
Expand Down
70 changes: 29 additions & 41 deletions docs/src/main/mdoc/consumers.md
Original file line number Diff line number Diff line change
Expand Up @@ -157,13 +157,9 @@ In addition, there are several settings specific to the library.
Once [`ConsumerSettings`][consumersettings] is defined, use `KafkaConsumer.stream` to create a [`KafkaConsumer`][kafkaconsumer] instance.

```scala mdoc:silent
object ConsumerExample extends IOApp {
def run(args: List[String]): IO[ExitCode] = {
val stream =
KafkaConsumer.stream(consumerSettings)

stream.compile.drain.as(ExitCode.Success)
}
object ConsumerExample extends IOApp.Simple {
val run: IO[Unit] =
KafkaConsumer.stream(consumerSettings).compile.drain
}
```

Expand All @@ -176,14 +172,11 @@ In the example above, we simply create the consumer and then immediately shutdow
We can use `subscribe` with a non-empty collection of topics, or `subscribeTo` for varargs support. There is also an option to `subscribe` using a `Regex` regular expression for the topic names, in case the exact topic names are not known up-front. When allocating a consumer in a `Stream` context, these are available as extension methods directly on the `Stream`.

```scala mdoc:silent
object ConsumerSubscribeExample extends IOApp {
def run(args: List[String]): IO[ExitCode] = {
val stream =
KafkaConsumer.stream(consumerSettings)
.subscribeTo("topic")

stream.compile.drain.as(ExitCode.Success)
}
object ConsumerSubscribeExample extends IOApp.Simple {
val run: IO[Unit] =
KafkaConsumer.stream(consumerSettings)
.subscribeTo("topic")
.compile.drain
}
```

Expand All @@ -194,15 +187,12 @@ Note that only automatic partition assignment is supported. Like in the [consume
Once subscribed to at least one topic, we can use `stream` for a `Stream` of [`CommittableConsumerRecord`][committableconsumerrecord]s. Each record contains a deserialized [`ConsumerRecord`][consumerrecord], as well as a [`CommittableOffset`][committableoffset] for managing [offset commits](#offset-commits). Streams guarantee records in topic-partition order, but not ordering across partitions. This is the same ordering guarantee that Kafka provides.

```scala mdoc:silent
object ConsumerStreamExample extends IOApp {
def run(args: List[String]): IO[ExitCode] = {
val stream =
KafkaConsumer.stream(consumerSettings)
.subscribeTo("topic")
.records

stream.compile.drain.as(ExitCode.Success)
}
object ConsumerStreamExample extends IOApp.Simple {
val run: IO[Unit] =
KafkaConsumer.stream(consumerSettings)
.subscribeTo("topic")
.records
.compile.drain
}
```

Expand All @@ -211,8 +201,8 @@ Note that this is an infinite stream, meaning it will only terminate if it's int
When using `stream`, records on all assigned partitions end up in the same `Stream`. Depending on how records are processed, we might want to separate records per topic-partition. This exact functionality is provided by `partitionedStream`.

```scala mdoc:silent
object ConsumerPartitionedStreamExample extends IOApp {
def run(args: List[String]): IO[ExitCode] = {
object ConsumerPartitionedStreamExample extends IOApp.Simple {
val run: IO[Unit] = {
def processRecord(record: ConsumerRecord[String, String]): IO[Unit] =
IO(println(s"Processing record: $record"))

Expand All @@ -228,7 +218,7 @@ object ConsumerPartitionedStreamExample extends IOApp {
}
.parJoinUnbounded

stream.compile.drain.as(ExitCode.Success)
stream.compile.drain
}
}
```
Expand All @@ -252,8 +242,8 @@ Note, that partition streams for revoked partitions will be closed after the new
When processing of records is independent of each other, as is the case with `processRecord` above, it's often easier and more performant to use `stream` and `mapAsync`, as seen in the example below. Generally, it's crucial to ensure there are no data races between processing of any two records.

```scala mdoc:silent
object ConsumerMapAsyncExample extends IOApp {
def run(args: List[String]): IO[ExitCode] = {
object ConsumerMapAsyncExample extends IOApp.Simple {
val run: IO[Unit] = {
def processRecord(record: ConsumerRecord[String, String]): IO[Unit] =
IO(println(s"Processing record: $record"))

Expand All @@ -265,7 +255,7 @@ object ConsumerMapAsyncExample extends IOApp {
processRecord(committable.record)
}

stream.compile.drain.as(ExitCode.Success)
stream.compile.drain
}
}
```
Expand All @@ -279,8 +269,8 @@ Offset commits are usually done in batches for performance reasons. We normally
We should keep the [`CommittableOffset`][committableoffset] in our `Stream` once we've finished processing the record. For at-least-once delivery, it's essential that offset commits preserve topic-partition ordering, so we have to make sure we keep offsets in the same order as we receive them. There are then several functions available for common batch committing scenarios, like `commitBatch`, `commitBatchOption`, and `commitBatchWithin`.

```scala mdoc:silent
object ConsumerCommitBatchExample extends IOApp {
def run(args: List[String]): IO[ExitCode] = {
object ConsumerCommitBatchExample extends IOApp.Simple {
val run: IO[Unit] = {
def processRecord(record: ConsumerRecord[String, String]): IO[Unit] =
IO(println(s"Processing record: $record"))

Expand All @@ -294,7 +284,7 @@ object ConsumerCommitBatchExample extends IOApp {
}
.through(commitBatchWithin(500, 15.seconds))

stream.compile.drain.as(ExitCode.Success)
stream.compile.drain
}
}
```
Expand All @@ -312,8 +302,8 @@ If we're sure we need to commit every offset, we can `commit` individual [`Commi
With the fs2-kafka you could gracefully shutdown a `KafkaConsumer`. Consider this example:

```scala mdoc:silent
object NoGracefulShutdownExample extends IOApp {
def run(args: List[String]): IO[ExitCode] = {
object NoGracefulShutdownExample extends IOApp.Simple {
val run: IO[Unit] = {
def processRecord(record: CommittableConsumerRecord[IO, String, String]): IO[Unit] =
IO(println(s"Processing record: $record"))

Expand All @@ -323,9 +313,7 @@ object NoGracefulShutdownExample extends IOApp {
}.through(commitBatchWithin(100, 15.seconds)).compile.drain
}

KafkaConsumer.resource(consumerSettings).use { consumer =>
run(consumer).as(ExitCode.Success)
}
KafkaConsumer.resource(consumerSettings).use(run)
}
}
```
Expand All @@ -344,8 +332,8 @@ We could combine `stopConsuming` with the custom resource handling and implement
```scala mdoc:silent
import cats.effect.{Deferred, Ref}

object WithGracefulShutdownExample extends IOApp {
def run(args: List[String]): IO[ExitCode] = {
object WithGracefulShutdownExample extends IOApp.Simple {
val run: IO[Unit] = {
def processRecord(record: CommittableConsumerRecord[IO, String, String]): IO[Unit] =
IO(println(s"Processing record: $record"))

Expand Down Expand Up @@ -383,7 +371,7 @@ object WithGracefulShutdownExample extends IOApp {
} yield ()
}).guarantee(closeConsumer) // [15]
}
} yield ExitCode.Success
} yield ()
}
}
```
Expand Down
Loading

0 comments on commit 79ccb79

Please sign in to comment.